]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Refactor logical worker synchronization code into a separate file.
authorAmit Kapila <akapila@postgresql.org>
Thu, 16 Oct 2025 05:10:50 +0000 (05:10 +0000)
committerAmit Kapila <akapila@postgresql.org>
Thu, 16 Oct 2025 05:10:50 +0000 (05:10 +0000)
To support the upcoming addition of a sequence synchronization worker,
this patch extracts common synchronization logic shared by table sync
workers and the new sequence sync worker into a dedicated file. This
modularization improves code reuse, maintainability, and clarity in the
logical workers framework.

Author: vignesh C <vignesh21@gmail.com>
Author: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com

13 files changed:
src/backend/catalog/pg_subscription.c
src/backend/replication/logical/Makefile
src/backend/replication/logical/applyparallelworker.c
src/backend/replication/logical/meson.build
src/backend/replication/logical/syncutils.c [new file with mode: 0644]
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/bin/pg_dump/common.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/include/catalog/pg_subscription_rel.h
src/include/replication/worker_internal.h
src/tools/pgindent/typedefs.list

index b885890de373863f047e37d2bc138b013cdc34a1..e06587b02650d2b111a2c81904fdc159ea162412 100644 (file)
@@ -506,13 +506,13 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 }
 
 /*
- * Does the subscription have any relations?
+ * Does the subscription have any tables?
  *
  * Use this function only to know true/false, and when you have no need for the
  * List returned by GetSubscriptionRelations.
  */
 bool
-HasSubscriptionRelations(Oid subid)
+HasSubscriptionTables(Oid subid)
 {
        Relation        rel;
        ScanKeyData skey[1];
index 1e08bbbd4eb15496a61317e02c662dc79feb3354..c62c8c67521c89a503beb9c5ef159e767e77c112 100644 (file)
@@ -28,6 +28,7 @@ OBJS = \
        reorderbuffer.o \
        slotsync.o \
        snapbuild.o \
+       syncutils.o \
        tablesync.o \
        worker.o
 
index 33b7ec7f029c35c1ff31343d74f36ba64bd04d8e..14325581afc159b2046de35b7b753977f841ba6f 100644 (file)
@@ -970,7 +970,7 @@ ParallelApplyWorkerMain(Datum main_arg)
         * the subscription relation state.
         */
        CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
-                                                                 invalidate_syncing_table_states,
+                                                                 InvalidateSyncingRelStates,
                                                                  (Datum) 0);
 
        set_apply_error_context_origin(originname);
index 6f19614c79d8f0f2c901beba8c7aaabf7352045d..9283e996ef4ae0152c4a824baee20c58c37ce2d5 100644 (file)
@@ -14,6 +14,7 @@ backend_sources += files(
   'reorderbuffer.c',
   'slotsync.c',
   'snapbuild.c',
+  'syncutils.c',
   'tablesync.c',
   'worker.c',
 )
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
new file mode 100644 (file)
index 0000000..1bb3ca0
--- /dev/null
@@ -0,0 +1,187 @@
+/*-------------------------------------------------------------------------
+ * syncutils.c
+ *       PostgreSQL logical replication: common synchronization code
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/logical/syncutils.c
+ *
+ * NOTES
+ *       This file contains code common for synchronization workers.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "catalog/pg_subscription_rel.h"
+#include "pgstat.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+
+/*
+ * Enum for phases of the subscription relations state.
+ *
+ * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
+ * state is no longer valid, and the subscription relations should be rebuilt.
+ *
+ * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
+ * relations state is being rebuilt.
+ *
+ * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
+ * up-to-date and valid.
+ */
+typedef enum
+{
+       SYNC_RELATIONS_STATE_NEEDS_REBUILD,
+       SYNC_RELATIONS_STATE_REBUILD_STARTED,
+       SYNC_RELATIONS_STATE_VALID,
+} SyncingRelationsState;
+
+static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
+
+/*
+ * Exit routine for synchronization worker.
+ */
+pg_noreturn void
+FinishSyncWorker(void)
+{
+       /*
+        * Commit any outstanding transaction. This is the usual case, unless
+        * there was nothing to do for the table.
+        */
+       if (IsTransactionState())
+       {
+               CommitTransactionCommand();
+               pgstat_report_stat(true);
+       }
+
+       /* And flush all writes. */
+       XLogFlush(GetXLogWriteRecPtr());
+
+       StartTransactionCommand();
+       ereport(LOG,
+                       (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+                                       MySubscription->name,
+                                       get_rel_name(MyLogicalRepWorker->relid))));
+       CommitTransactionCommand();
+
+       /* Find the leader apply worker and signal it. */
+       logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+
+       /* Stop gracefully */
+       proc_exit(0);
+}
+
+/*
+ * Callback from syscache invalidation.
+ */
+void
+InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
+{
+       relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
+}
+
+/*
+ * Process possible state change(s) of relations that are being synchronized.
+ */
+void
+ProcessSyncingRelations(XLogRecPtr current_lsn)
+{
+       switch (MyLogicalRepWorker->type)
+       {
+               case WORKERTYPE_PARALLEL_APPLY:
+
+                       /*
+                        * Skip for parallel apply workers because they only operate on
+                        * tables that are in a READY state. See pa_can_start() and
+                        * should_apply_changes_for_rel().
+                        */
+                       break;
+
+               case WORKERTYPE_TABLESYNC:
+                       ProcessSyncingTablesForSync(current_lsn);
+                       break;
+
+               case WORKERTYPE_APPLY:
+                       ProcessSyncingTablesForApply(current_lsn);
+                       break;
+
+               case WORKERTYPE_UNKNOWN:
+                       /* Should never happen. */
+                       elog(ERROR, "Unknown worker type");
+       }
+}
+
+/*
+ * Common code to fetch the up-to-date sync state info into the static lists.
+ *
+ * Returns true if subscription has 1 or more tables, else false.
+ *
+ * Note: If this function started the transaction (indicated by the parameter)
+ * then it is the caller's responsibility to commit it.
+ */
+bool
+FetchRelationStates(bool *started_tx)
+{
+       static bool has_subtables = false;
+
+       *started_tx = false;
+
+       if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
+       {
+               MemoryContext oldctx;
+               List       *rstates;
+               ListCell   *lc;
+               SubscriptionRelState *rstate;
+
+               relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
+
+               /* Clean the old lists. */
+               list_free_deep(table_states_not_ready);
+               table_states_not_ready = NIL;
+
+               if (!IsTransactionState())
+               {
+                       StartTransactionCommand();
+                       *started_tx = true;
+               }
+
+               /* Fetch tables and sequences that are in non-ready state. */
+               rstates = GetSubscriptionRelations(MySubscription->oid, true);
+
+               /* Allocate the tracking info in a permanent memory context. */
+               oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+               foreach(lc, rstates)
+               {
+                       rstate = palloc(sizeof(SubscriptionRelState));
+                       memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
+                       table_states_not_ready = lappend(table_states_not_ready, rstate);
+               }
+               MemoryContextSwitchTo(oldctx);
+
+               /*
+                * Does the subscription have tables?
+                *
+                * If there were not-READY tables found then we know it does. But if
+                * table_states_not_ready was empty we still need to check again to
+                * see if there are 0 tables.
+                */
+               has_subtables = (table_states_not_ready != NIL) ||
+                       HasSubscriptionTables(MySubscription->oid);
+
+               /*
+                * If the subscription relation cache has been invalidated since we
+                * entered this routine, we still use and return the relations we just
+                * finished constructing, to avoid infinite loops, but we leave the
+                * table states marked as stale so that we'll rebuild it again on next
+                * access. Otherwise, we mark the table states as valid.
+                */
+               if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
+                       relation_states_validity = SYNC_RELATIONS_STATE_VALID;
+       }
+
+       return has_subtables;
+}
index e6da4028d392e89fe72508cf82db363547bfb084..2ba12517e93b6696dc55c9d9aeafdcb68b422465 100644 (file)
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
-#include "utils/memutils.h"
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/usercontext.h"
 
-typedef enum
-{
-       SYNC_TABLE_STATE_NEEDS_REBUILD,
-       SYNC_TABLE_STATE_REBUILD_STARTED,
-       SYNC_TABLE_STATE_VALID,
-} SyncingTablesState;
-
-static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
-static List *table_states_not_ready = NIL;
-static bool FetchTableStates(bool *started_tx);
+List      *table_states_not_ready = NIL;
 
 static StringInfo copybuf = NULL;
 
-/*
- * Exit routine for synchronization worker.
- */
-pg_noreturn static void
-finish_sync_worker(void)
-{
-       /*
-        * Commit any outstanding transaction. This is the usual case, unless
-        * there was nothing to do for the table.
-        */
-       if (IsTransactionState())
-       {
-               CommitTransactionCommand();
-               pgstat_report_stat(true);
-       }
-
-       /* And flush all writes. */
-       XLogFlush(GetXLogWriteRecPtr());
-
-       StartTransactionCommand();
-       ereport(LOG,
-                       (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
-                                       MySubscription->name,
-                                       get_rel_name(MyLogicalRepWorker->relid))));
-       CommitTransactionCommand();
-
-       /* Find the leader apply worker and signal it. */
-       logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
-
-       /* Stop gracefully */
-       proc_exit(0);
-}
-
 /*
  * Wait until the relation sync state is set in the catalog to the expected
  * one; return true when it happens.
@@ -180,7 +137,7 @@ finish_sync_worker(void)
  * CATCHUP state to SYNCDONE.
  */
 static bool
-wait_for_relation_state_change(Oid relid, char expected_state)
+wait_for_table_state_change(Oid relid, char expected_state)
 {
        char            state;
 
@@ -273,15 +230,6 @@ wait_for_worker_state_change(char expected_state)
        return false;
 }
 
-/*
- * Callback from syscache invalidation.
- */
-void
-invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
-{
-       table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
-}
-
 /*
  * Handle table synchronization cooperation from the synchronization
  * worker.
@@ -290,8 +238,8 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
  * predetermined synchronization point in the WAL stream, mark the table as
  * SYNCDONE and finish.
  */
-static void
-process_syncing_tables_for_sync(XLogRecPtr current_lsn)
+void
+ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
 {
        SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
@@ -349,9 +297,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
                /*
                 * Start a new transaction to clean up the tablesync origin tracking.
-                * This transaction will be ended within the finish_sync_worker().
-                * Now, even, if we fail to remove this here, the apply worker will
-                * ensure to clean it up afterward.
+                * This transaction will be ended within the FinishSyncWorker(). Now,
+                * even, if we fail to remove this here, the apply worker will ensure
+                * to clean it up afterward.
                 *
                 * We need to do this after the table state is set to SYNCDONE.
                 * Otherwise, if an error occurs while performing the database
@@ -387,7 +335,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                 */
                replorigin_drop_by_name(originname, true, false);
 
-               finish_sync_worker();
+               FinishSyncWorker();
        }
        else
                SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -414,8 +362,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  * If the synchronization position is reached (SYNCDONE), then the table can
  * be marked as READY and is no longer tracked.
  */
-static void
-process_syncing_tables_for_apply(XLogRecPtr current_lsn)
+void
+ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
 {
        struct tablesync_start_time_mapping
        {
@@ -431,7 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
        Assert(!IsTransactionState());
 
        /* We need up-to-date sync state info for subscription tables here. */
-       FetchTableStates(&started_tx);
+       FetchRelationStates(&started_tx);
 
        /*
         * Prepare a hash table for tracking last start times of workers, to avoid
@@ -586,8 +534,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                                        StartTransactionCommand();
                                        started_tx = true;
 
-                                       wait_for_relation_state_change(rstate->relid,
-                                                                                                  SUBREL_STATE_SYNCDONE);
+                                       wait_for_table_state_change(rstate->relid,
+                                                                                               SUBREL_STATE_SYNCDONE);
                                }
                                else
                                        LWLockRelease(LogicalRepWorkerLock);
@@ -689,37 +637,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
        }
 }
 
-/*
- * Process possible state change(s) of tables that are being synchronized.
- */
-void
-process_syncing_tables(XLogRecPtr current_lsn)
-{
-       switch (MyLogicalRepWorker->type)
-       {
-               case WORKERTYPE_PARALLEL_APPLY:
-
-                       /*
-                        * Skip for parallel apply workers because they only operate on
-                        * tables that are in a READY state. See pa_can_start() and
-                        * should_apply_changes_for_rel().
-                        */
-                       break;
-
-               case WORKERTYPE_TABLESYNC:
-                       process_syncing_tables_for_sync(current_lsn);
-                       break;
-
-               case WORKERTYPE_APPLY:
-                       process_syncing_tables_for_apply(current_lsn);
-                       break;
-
-               case WORKERTYPE_UNKNOWN:
-                       /* Should never happen. */
-                       elog(ERROR, "Unknown worker type");
-       }
-}
-
 /*
  * Create list of columns for COPY based on logical relation mapping.
  */
@@ -1356,7 +1273,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                case SUBREL_STATE_SYNCDONE:
                case SUBREL_STATE_READY:
                case SUBREL_STATE_UNKNOWN:
-                       finish_sync_worker();   /* doesn't return */
+                       FinishSyncWorker(); /* doesn't return */
        }
 
        /* Calculate the name of the tablesync slot. */
@@ -1599,77 +1516,6 @@ copy_table_done:
        return slotname;
 }
 
-/*
- * Common code to fetch the up-to-date sync state info into the static lists.
- *
- * Returns true if subscription has 1 or more tables, else false.
- *
- * Note: If this function started the transaction (indicated by the parameter)
- * then it is the caller's responsibility to commit it.
- */
-static bool
-FetchTableStates(bool *started_tx)
-{
-       static bool has_subrels = false;
-
-       *started_tx = false;
-
-       if (table_states_validity != SYNC_TABLE_STATE_VALID)
-       {
-               MemoryContext oldctx;
-               List       *rstates;
-               ListCell   *lc;
-               SubscriptionRelState *rstate;
-
-               table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
-
-               /* Clean the old lists. */
-               list_free_deep(table_states_not_ready);
-               table_states_not_ready = NIL;
-
-               if (!IsTransactionState())
-               {
-                       StartTransactionCommand();
-                       *started_tx = true;
-               }
-
-               /* Fetch all non-ready tables. */
-               rstates = GetSubscriptionRelations(MySubscription->oid, true);
-
-               /* Allocate the tracking info in a permanent memory context. */
-               oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-               foreach(lc, rstates)
-               {
-                       rstate = palloc(sizeof(SubscriptionRelState));
-                       memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
-                       table_states_not_ready = lappend(table_states_not_ready, rstate);
-               }
-               MemoryContextSwitchTo(oldctx);
-
-               /*
-                * Does the subscription have tables?
-                *
-                * If there were not-READY relations found then we know it does. But
-                * if table_states_not_ready was empty we still need to check again to
-                * see if there are 0 tables.
-                */
-               has_subrels = (table_states_not_ready != NIL) ||
-                       HasSubscriptionRelations(MySubscription->oid);
-
-               /*
-                * If the subscription relation cache has been invalidated since we
-                * entered this routine, we still use and return the relations we just
-                * finished constructing, to avoid infinite loops, but we leave the
-                * table states marked as stale so that we'll rebuild it again on next
-                * access. Otherwise, we mark the table states as valid.
-                */
-               if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
-                       table_states_validity = SYNC_TABLE_STATE_VALID;
-       }
-
-       return has_subrels;
-}
-
 /*
  * Execute the initial sync with error handling. Disable the subscription,
  * if it's required.
@@ -1755,7 +1601,7 @@ TablesyncWorkerMain(Datum main_arg)
 
        run_tablesync_worker();
 
-       finish_sync_worker();
+       FinishSyncWorker();
 }
 
 /*
@@ -1773,7 +1619,7 @@ AllTablesyncsReady(void)
        bool            has_subrels = false;
 
        /* We need up-to-date sync state info for subscription tables here. */
-       has_subrels = FetchTableStates(&started_tx);
+       has_subrels = FetchRelationStates(&started_tx);
 
        if (started_tx)
        {
@@ -1789,21 +1635,21 @@ AllTablesyncsReady(void)
 }
 
 /*
- * Return whether the subscription currently has any relations.
+ * Return whether the subscription currently has any tables.
  *
- * Note: Unlike HasSubscriptionRelations(), this function relies on cached
- * information for subscription relations. Additionally, it should not be
+ * Note: Unlike HasSubscriptionTables(), this function relies on cached
+ * information for subscription tables. Additionally, it should not be
  * invoked outside of apply or tablesync workers, as MySubscription must be
  * initialized first.
  */
 bool
-HasSubscriptionRelationsCached(void)
+HasSubscriptionTablesCached(void)
 {
        bool            started_tx;
        bool            has_subrels;
 
        /* We need up-to-date subscription tables info here */
-       has_subrels = FetchTableStates(&started_tx);
+       has_subrels = FetchRelationStates(&started_tx);
 
        if (started_tx)
        {
index 419e478b4c65839a9afb7cb29fe6f09985e9648b..3c58ad884762fa3fb4a4c54f1a786383c4d256c0 100644 (file)
@@ -91,7 +91,7 @@
  * behave as if two_phase = off. When the apply worker detects that all
  * tablesyncs have become READY (while the tri-state was PENDING) it will
  * restart the apply worker process. This happens in
- * process_syncing_tables_for_apply.
+ * ProcessSyncingTablesForApply.
  *
  * When the (re-started) apply worker finds that all tablesyncs are READY for a
  * two_phase tri-state of PENDING it start streaming messages with the
@@ -1243,7 +1243,7 @@ apply_handle_commit(StringInfo s)
        apply_handle_commit_internal(&commit_data);
 
        /* Process any tables that are being synchronized in parallel. */
-       process_syncing_tables(commit_data.end_lsn);
+       ProcessSyncingRelations(commit_data.end_lsn);
 
        pgstat_report_activity(STATE_IDLE, NULL);
        reset_apply_error_context_info();
@@ -1365,7 +1365,7 @@ apply_handle_prepare(StringInfo s)
        in_remote_transaction = false;
 
        /* Process any tables that are being synchronized in parallel. */
-       process_syncing_tables(prepare_data.end_lsn);
+       ProcessSyncingRelations(prepare_data.end_lsn);
 
        /*
         * Since we have already prepared the transaction, in a case where the
@@ -1421,7 +1421,7 @@ apply_handle_commit_prepared(StringInfo s)
        in_remote_transaction = false;
 
        /* Process any tables that are being synchronized in parallel. */
-       process_syncing_tables(prepare_data.end_lsn);
+       ProcessSyncingRelations(prepare_data.end_lsn);
 
        clear_subscription_skip_lsn(prepare_data.end_lsn);
 
@@ -1487,7 +1487,7 @@ apply_handle_rollback_prepared(StringInfo s)
        in_remote_transaction = false;
 
        /* Process any tables that are being synchronized in parallel. */
-       process_syncing_tables(rollback_data.rollback_end_lsn);
+       ProcessSyncingRelations(rollback_data.rollback_end_lsn);
 
        pgstat_report_activity(STATE_IDLE, NULL);
        reset_apply_error_context_info();
@@ -1622,7 +1622,7 @@ apply_handle_stream_prepare(StringInfo s)
        pgstat_report_stat(false);
 
        /* Process any tables that are being synchronized in parallel. */
-       process_syncing_tables(prepare_data.end_lsn);
+       ProcessSyncingRelations(prepare_data.end_lsn);
 
        /*
         * Similar to prepare case, the subskiplsn could be left in a case of
@@ -2464,7 +2464,7 @@ apply_handle_stream_commit(StringInfo s)
        }
 
        /* Process any tables that are being synchronized in parallel. */
-       process_syncing_tables(commit_data.end_lsn);
+       ProcessSyncingRelations(commit_data.end_lsn);
 
        pgstat_report_activity(STATE_IDLE, NULL);
 
@@ -4133,7 +4133,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                        maybe_reread_subscription();
 
                        /* Process any table synchronization changes. */
-                       process_syncing_tables(last_received);
+                       ProcessSyncingRelations(last_received);
                }
 
                /* Cleanup the memory. */
@@ -4623,7 +4623,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
         * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
         * concurrently add tables to the subscription, the apply worker may not
         * process invalidations in time. Consequently,
-        * HasSubscriptionRelationsCached() might miss the new tables, leading to
+        * HasSubscriptionTablesCached() might miss the new tables, leading to
         * premature advancement of oldest_nonremovable_xid.
         *
         * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
@@ -4637,7 +4637,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
         * subscription tables at this stage to prevent unnecessary tuple
         * retention.
         */
-       if (HasSubscriptionRelationsCached() && !AllTablesyncsReady())
+       if (HasSubscriptionTablesCached() && !AllTablesyncsReady())
        {
                TimestampTz now;
 
@@ -5876,7 +5876,7 @@ SetupApplyOrSyncWorker(int worker_slot)
         * the subscription relation state.
         */
        CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
-                                                                 invalidate_syncing_table_states,
+                                                                 InvalidateSyncingRelStates,
                                                                  (Datum) 0);
 }
 
index a1976fae607d640aeda4809ee0021722bb96a04d..4e7303ea6317abc9a797f94614061fba12e3f732 100644 (file)
@@ -244,8 +244,8 @@ getSchemaData(Archive *fout, int *numTablesPtr)
        pg_log_info("reading subscriptions");
        getSubscriptions(fout);
 
-       pg_log_info("reading subscription membership of tables");
-       getSubscriptionTables(fout);
+       pg_log_info("reading subscription membership of relations");
+       getSubscriptionRelations(fout);
 
        free(inhinfo);                          /* not needed any longer */
 
index 641bece12c7673e58c9734999672117b6e63d2f0..890db7b08c23278c795b7ac87b780697d085ea56 100644 (file)
@@ -5305,12 +5305,12 @@ getSubscriptions(Archive *fout)
 }
 
 /*
- * getSubscriptionTables
- *       Get information about subscription membership for dumpable tables. This
+ * getSubscriptionRelations
+ *       Get information about subscription membership for dumpable relations. This
  *    will be used only in binary-upgrade mode for PG17 or later versions.
  */
 void
-getSubscriptionTables(Archive *fout)
+getSubscriptionRelations(Archive *fout)
 {
        DumpOptions *dopt = fout->dopt;
        SubscriptionInfo *subinfo = NULL;
@@ -5364,7 +5364,7 @@ getSubscriptionTables(Archive *fout)
 
                tblinfo = findTableByOid(relid);
                if (tblinfo == NULL)
-                       pg_fatal("failed sanity check, table with OID %u not found",
+                       pg_fatal("failed sanity check, relation with OID %u not found",
                                         relid);
 
                /* OK, make a DumpableObject for this relationship */
index fa6d1a510f7ddfc47aea6fde9976de777fff5c4a..72a00e1bc202898cfb06906bb2836cea0c108e25 100644 (file)
@@ -829,6 +829,6 @@ extern void getPublicationNamespaces(Archive *fout);
 extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
                                                                 int numTables);
 extern void getSubscriptions(Archive *fout);
-extern void getSubscriptionTables(Archive *fout);
+extern void getSubscriptionRelations(Archive *fout);
 
 #endif                                                 /* PG_DUMP_H */
index 02f97a547dd58b19c7539ada30d477f629469e35..61b63c6bb7a68d0b73eccf4ed1969bbab2c2817b 100644 (file)
@@ -89,7 +89,7 @@ extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
-extern bool HasSubscriptionRelations(Oid subid);
+extern bool HasSubscriptionTables(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
 
 extern void UpdateDeadTupleRetentionStatus(Oid subid, bool active);
index de003802612790b6f6cd8b9693b9c647ee4c2e0f..ae352f6e69121397cdfe0ce7fdd0fb673c86740b 100644 (file)
@@ -251,6 +251,8 @@ extern PGDLLIMPORT bool in_remote_transaction;
 
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
+extern PGDLLIMPORT List *table_states_not_ready;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                                                                                bool only_running);
@@ -272,12 +274,16 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
                                                                                           char *originname, Size szoriginname);
 
 extern bool AllTablesyncsReady(void);
-extern bool HasSubscriptionRelationsCached(void);
+extern bool HasSubscriptionTablesCached(void);
 extern void UpdateTwoPhaseState(Oid suboid, char new_state);
 
-extern void process_syncing_tables(XLogRecPtr current_lsn);
-extern void invalidate_syncing_table_states(Datum arg, int cacheid,
-                                                                                       uint32 hashvalue);
+extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
+extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
+
+pg_noreturn extern void FinishSyncWorker(void);
+extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
+extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
+extern bool FetchRelationStates(bool *started_tx);
 
 extern void stream_start_internal(TransactionId xid, bool first_segment);
 extern void stream_stop_internal(TransactionId xid);
index 5290b91e83e791f02a0350a9e7b6bc9cbba9b074..ee1cab6190f533b18facdf1318a9e08f3d1fe25e 100644 (file)
@@ -2922,7 +2922,7 @@ SyncRepStandbyData
 SyncRequestHandler
 SyncRequestType
 SyncStandbySlotsConfigData
-SyncingTablesState
+SyncingRelationsState
 SysFKRelationship
 SysScanDesc
 SyscacheCallbackFunction