]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Revert "Allow logical replication snapshots to be database-specific" master github/master
authorÁlvaro Herrera <alvherre@kurilemu.de>
Sun, 24 May 2026 04:33:19 +0000 (21:33 -0700)
committerÁlvaro Herrera <alvherre@kurilemu.de>
Sun, 24 May 2026 04:33:19 +0000 (21:33 -0700)
This reverts commit 0d3dba38c777, which was determined to have
fundamental flaws.  This restricts REPACK (CONCURRENTLY) so that only
one process can run it concurrently on different tables and even on
different databases; we'll lift that restriction in another way during
the next development cycle.

Reported-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1Jg21ODQ7fS2fvN5W_S5kDRhAP5inj3XMRQaa=s-GbYhw@mail.gmail.com

21 files changed:
contrib/pg_visibility/pg_visibility.c
doc/src/sgml/logicaldecoding.sgml
src/backend/access/index/genam.c
src/backend/access/rmgrdesc/standbydesc.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogfuncs.c
src/backend/postmaster/bgwriter.c
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/snapbuild.c
src/backend/replication/pgrepack/pgrepack.c
src/backend/replication/slot.c
src/backend/storage/ipc/procarray.c
src/backend/storage/ipc/standby.c
src/include/access/xlog_internal.h
src/include/miscadmin.h
src/include/replication/output_plugin.h
src/include/replication/snapbuild.h
src/include/storage/procarray.h
src/include/storage/standby.h
src/include/storage/standbydefs.h

index d564bd2a00c7d5824be11f9971019defa01c3bf9..dfab0b64cf50baabf7a7baedbeda31189188b6b6 100644 (file)
@@ -621,7 +621,7 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
        else if (rel == NULL || rel->rd_rel->relisshared)
        {
                /* Shared relation: take into account all running xids */
-               runningTransactions = GetRunningTransactionData(InvalidOid);
+               runningTransactions = GetRunningTransactionData();
                LWLockRelease(ProcArrayLock);
                LWLockRelease(XidGenLock);
                return runningTransactions->oldestRunningXid;
@@ -632,7 +632,7 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
                 * Normal relation: take into account xids running within the current
                 * database
                 */
-               runningTransactions = GetRunningTransactionData(InvalidOid);
+               runningTransactions = GetRunningTransactionData();
                LWLockRelease(ProcArrayLock);
                LWLockRelease(XidGenLock);
                return runningTransactions->oldestDatabaseRunningXid;
index 9b1d68d0de61e9db68696a5d77d1c9df93763796..6dc49108997dda6e6ef94b7bc7ae6dc9f997fe53 100644 (file)
@@ -959,7 +959,6 @@ typedef struct OutputPluginOptions
 {
     OutputPluginOutputType output_type;
     bool        receive_rewrites;
-    bool        need_shared_catalogs;
 } OutputPluginOptions;
 </programlisting>
       <literal>output_type</literal> has to either be set to
@@ -970,9 +969,6 @@ typedef struct OutputPluginOptions
       also be called for changes made by heap rewrites during certain DDL
       operations.  These are of interest to plugins that handle DDL
       replication, but they require special handling.
-      <literal>need_shared_catalogs</literal> can be set to false if you are
-      certain the plugin functions do not access shared system catalogs.
-      Doing so can speed up creation of replication slots that use this plugin.
      </para>
 
      <para>
index 97d44b846229637bce5ba01f4c4ef6bb20975a38..1408989c5687378233152d1dce3cbcac4cae225a 100644 (file)
@@ -394,14 +394,6 @@ systable_beginscan(Relation heapRelation,
        SysScanDesc sysscan;
        Relation        irel;
 
-       /*
-        * If this backend promised that it won't access shared catalogs during
-        * logical decoding, this it the right place to verify.
-        */
-       Assert(!HistoricSnapshotActive() ||
-                  accessSharedCatalogsInDecoding ||
-                  !heapRelation->rd_rel->relisshared);
-
        if (indexOK &&
                !IgnoreSystemIndexes &&
                !ReindexIsProcessingIndex(indexId))
index 685d1bdb02413fac3b02c2127a925188bf834a31..0a291354ae23af745a01f6f87f9363a803b8c9af 100644 (file)
@@ -41,8 +41,6 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
                for (i = 0; i < xlrec->subxcnt; i++)
                        appendStringInfo(buf, " %u", xlrec->xids[xlrec->xcnt + i]);
        }
-
-       appendStringInfo(buf, "; dbid: %u", xlrec->dbid);
 }
 
 void
index d34e34a56c508243cf96f662b0e20e9730cfc842..beddcb552d6503b3d2f724f5a8756c8761684eb3 100644 (file)
@@ -7735,7 +7735,7 @@ CreateCheckPoint(int flags)
         * recovery we don't need to write running xact data.
         */
        if (!shutdown && XLogStandbyInfoActive())
-               LogStandbySnapshot(InvalidOid);
+               LogStandbySnapshot();
 
        START_CRIT_SECTION();
 
index 0f5979691e6bf70b406f89f348cf89ee2987c72c..65bbaeda59c4e707ab484c5f50bef4337793f87f 100644 (file)
@@ -245,7 +245,7 @@ pg_log_standby_snapshot(PG_FUNCTION_ARGS)
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("pg_log_standby_snapshot() can only be used if \"wal_level\" >= \"replica\"")));
 
-       recptr = LogStandbySnapshot(InvalidOid);
+       recptr = LogStandbySnapshot();
 
        /*
         * As a convenience, return the WAL location of the last inserted record
index cd1bf9d919c1e15ccafb847c418e98513a1664b2..d364382303af5a4b154d28b5006ce118c116432a 100644 (file)
@@ -289,7 +289,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len)
                        if (now >= timeout &&
                                last_snapshot_lsn <= GetLastImportantRecPtr())
                        {
-                               last_snapshot_lsn = LogStandbySnapshot(InvalidOid);
+                               last_snapshot_lsn = LogStandbySnapshot();
                                last_snapshot_ts = now;
                        }
                }
index 38c5a4f554070d14a5b818bf686e425217081cf7..c944be4ac83c8a45fda267290dfa3cef9cda236e 100644 (file)
@@ -386,12 +386,8 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
                                 * Update this decoder's idea of transactions currently
                                 * running.  In doing so we will determine whether we have
                                 * reached consistent status.
-                                *
-                                * If the output plugin doesn't need access to shared
-                                * catalogs, we can ignore transactions in other databases.
                                 */
-                               SnapBuildProcessRunningXacts(builder, buf->origptr, running,
-                                                                                        !ctx->options.need_shared_catalogs);
+                               SnapBuildProcessRunningXacts(builder, buf->origptr, running);
 
                                /*
                                 * Abort all transactions that we keep track of, that are
@@ -401,12 +397,8 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
                                 * all running transactions which includes prepared ones,
                                 * while shutdown checkpoints just know that no non-prepared
                                 * transactions are in progress.
-                                *
-                                * The database-specific records might work here too, but it's
-                                * not their purpose.
                                 */
-                               if (!OidIsValid(running->dbid))
-                                       ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
+                               ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
                        }
                        break;
                case XLOG_STANDBY_LOCK:
index a33a685dcc6d5adc7dd1b066895bedad9184c384..b969caae72ef0032e38d08391327be44ec298d58 100644 (file)
@@ -285,9 +285,6 @@ StartupDecodingContext(List *output_plugin_options,
        ctx->write = do_write;
        ctx->update_progress = update_progress;
 
-       /* Assume shared catalog access. The startup callback can change it. */
-       ctx->options.need_shared_catalogs = true;
-
        ctx->output_plugin_options = output_plugin_options;
 
        ctx->fast_forward = fast_forward;
index c8309b96ed45ce0e86b485d52b9b2732e111e6e9..b89922349249e8231e079dfbe8070e6458152e67 100644 (file)
 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
 static bool ExportInProgress = false;
 
-/*
- * If a backend is going to do logical decoding and the output plugin does
- * not need to access shared catalogs, setting this variable to false can make
- * the decoding startup faster. In particular, the backend will not need to
- * wait for completion of already running transactions in other databases.
- */
-bool           accessSharedCatalogsInDecoding = true;
-
 /* ->committed and ->catchange manipulation */
 static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
 
@@ -235,9 +227,6 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 
        MemoryContextSwitchTo(oldcontext);
 
-       /* The default is that shared catalog are used. */
-       accessSharedCatalogsInDecoding = true;
-
        return builder;
 }
 
@@ -256,9 +245,6 @@ FreeSnapshotBuilder(SnapBuild *builder)
                builder->snapshot = NULL;
        }
 
-       /* The default is that shared catalog are used. */
-       accessSharedCatalogsInDecoding = true;
-
        /* other resources are deallocated via memory context reset */
        MemoryContextDelete(context);
 }
@@ -1151,8 +1137,7 @@ SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
  * anymore.
  */
 void
-SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running,
-                                                        bool db_specific)
+SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 {
        ReorderBufferTXN *txn;
        TransactionId xmin;
@@ -1164,33 +1149,6 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
         */
        if (builder->state < SNAPBUILD_CONSISTENT)
        {
-               /*
-                * To reduce the potential for unnecessarily waiting for completion of
-                * unrelated transactions, the caller can declare that only
-                * transactions of the current database are relevant at this stage.
-                */
-               if (db_specific)
-               {
-                       /*
-                        * If we must only keep track of transactions running in the
-                        * current database, we need transaction info from exactly that
-                        * database.
-                        */
-                       if (running->dbid != MyDatabaseId)
-                       {
-                               LogStandbySnapshot(MyDatabaseId);
-
-                               return;
-                       }
-
-                       /*
-                        * We'd better be able to check during scan if the plugin does not
-                        * lie.
-                        */
-                       if (accessSharedCatalogsInDecoding)
-                               accessSharedCatalogsInDecoding = false;
-               }
-
                /* returns false if there's no point in performing cleanup just yet */
                if (!SnapBuildFindSnapshot(builder, lsn, running))
                        return;
@@ -1198,16 +1156,6 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
        else
                SnapBuildSerialize(builder, lsn);
 
-       /*
-        * Database specific transaction info may exist to reach CONSISTENT state
-        * faster, however the code below makes no use of it. Moreover, such
-        * record might cause problems because the following normal (cluster-wide)
-        * record can have lower value of oldestRunningXid. In that case, let's
-        * wait with the cleanup for the next regular cluster-wide record.
-        */
-       if (OidIsValid(running->dbid))
-               return;
-
        /*
         * Update range of interesting xids based on the running xacts
         * information. We don't increase ->xmax using it, because once we are in
@@ -1518,11 +1466,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
         */
        if (!RecoveryInProgress())
        {
-               /*
-                * If the last transaction info was about specific database, so needs
-                * to be the next one - at least until we're in the CONSISTENT state.
-                */
-               LogStandbySnapshot(running->dbid);
+               LogStandbySnapshot();
        }
 }
 
index eb9a883d7a9f3adcecfcd3fed1cd527ee93cb177..a2615ce54c1e3b7427b01ea3add3136b7af5ac3e 100644 (file)
@@ -52,13 +52,6 @@ repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
        /* Probably unnecessary, as we don't use the SQL interface ... */
        opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
 
-       /*
-        * REPACK doesn't need access to shared catalogs, so we can speed up the
-        * historic snapshot creation by setting this flag.  We'll only have to
-        * wait for transactions in our database.
-        */
-       opt->need_shared_catalogs = false;
-
        if (ctx->output_plugin_options != NIL)
        {
                ereport(ERROR,
index 83fcde74718080ea1135b5614f3d8b0e4132b20e..c0c9f514f7b909311d20a3c7d454bd7a7c904bcb 100644 (file)
@@ -1776,7 +1776,7 @@ ReplicationSlotReserveWal(void)
                XLogRecPtr      flushptr;
 
                /* make sure we have enough information to start */
-               flushptr = LogStandbySnapshot(InvalidOid);
+               flushptr = LogStandbySnapshot();
 
                /* and make sure it's fsynced to disk */
                XLogFlush(flushptr);
index 9299bcebbda87c4249d15ae7c72bf5aebd34e1bf..f540bb6b23f04ed272c91b7ca1724ee9bc58ab1a 100644 (file)
@@ -2623,11 +2623,9 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
  *
  * Note that if any transaction has overflowed its cached subtransactions
  * then there is no real need include any subtransactions.
- *
- * If 'dbid' is valid, only gather transactions running in that database.
  */
 RunningTransactions
-GetRunningTransactionData(Oid dbid)
+GetRunningTransactionData(void)
 {
        /* result workspace */
        static RunningTransactionsData CurrentRunningXactsData;
@@ -2702,18 +2700,6 @@ GetRunningTransactionData(Oid dbid)
                if (!TransactionIdIsValid(xid))
                        continue;
 
-               /*
-                * Filter by database OID if requested.
-                */
-               if (OidIsValid(dbid))
-               {
-                       int                     pgprocno = arrayP->pgprocnos[index];
-                       PGPROC     *proc = &allProcs[pgprocno];
-
-                       if (proc->databaseId != dbid)
-                               continue;
-               }
-
                /*
                 * Be careful not to exclude any xids before calculating the values of
                 * oldestRunningXid and suboverflowed, since these are used to clean
@@ -2764,12 +2750,6 @@ GetRunningTransactionData(Oid dbid)
                        PGPROC     *proc = &allProcs[pgprocno];
                        int                     nsubxids;
 
-                       /*
-                        * Filter by database OID if requested.
-                        */
-                       if (OidIsValid(dbid) && proc->databaseId != dbid)
-                               continue;
-
                        /*
                         * Save subtransaction XIDs. Other backends can't add or remove
                         * entries while we're holding XidGenLock.
@@ -2803,7 +2783,6 @@ GetRunningTransactionData(Oid dbid)
         * increases if slots do.
         */
 
-       CurrentRunningXacts->dbid = dbid;
        CurrentRunningXacts->xcnt = count - subcount;
        CurrentRunningXacts->subxcnt = subcount;
        CurrentRunningXacts->subxid_status = suboverflowed ? SUBXIDS_IN_SUBTRANS : SUBXIDS_IN_ARRAY;
index 29af7733948322da179ec2240940b4f883af9c0d..de9092fdf5bc9c5d1f83e3c284fdba9a63d58206 100644 (file)
@@ -1188,14 +1188,6 @@ standby_redo(XLogReaderState *record)
                xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
                RunningTransactionsData running;
 
-               /*
-                * Records issued for specific database are not suitable for physical
-                * replication because that affects the whole cluster. In particular,
-                * the list of XID is probably incomplete here.
-                */
-               if (OidIsValid(xlrec->dbid))
-                       return;
-
                running.xcnt = xlrec->xcnt;
                running.subxcnt = xlrec->subxcnt;
                running.subxid_status = xlrec->subxid_overflow ? SUBXIDS_MISSING : SUBXIDS_IN_ARRAY;
@@ -1285,22 +1277,11 @@ standby_redo(XLogReaderState *record)
  * as there's no independent knob to just enable logical decoding. For
  * details of how this is used, check snapbuild.c's introductory comment.
  *
- * If 'dbid' is valid, only gather transactions running in that
- * database. snapbuild.c can use such running xacts information for faster
- * startup, but it still needs normal (cluster-wide) during the actual
- * decoding - see standby_decode() and SnapBuildProcessRunningXacts() for
- * details. Other processes (e.g. checkpointer) issue the cluster-wide records
- * whether logical decoding is active or not.
- *
- * Please be careful about using this argument for other purposes. In
- * particular, physical replication *must* ignore the database-specific
- * records, exactly because they do not cover the whole cluster - see
- * standby_redo().
  *
  * Returns the RecPtr of the last inserted record.
  */
 XLogRecPtr
-LogStandbySnapshot(Oid dbid)
+LogStandbySnapshot(void)
 {
        XLogRecPtr      recptr;
        RunningTransactions running;
@@ -1333,7 +1314,7 @@ LogStandbySnapshot(Oid dbid)
         * Log details of all in-progress transactions. This should be the last
         * record we write, because standby will open up when it sees this.
         */
-       running = GetRunningTransactionData(dbid);
+       running = GetRunningTransactionData();
 
        /*
         * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
@@ -1377,7 +1358,6 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
        xl_running_xacts xlrec;
        XLogRecPtr      recptr;
 
-       xlrec.dbid = CurrRunningXacts->dbid;
        xlrec.xcnt = CurrRunningXacts->xcnt;
        xlrec.subxcnt = CurrRunningXacts->subxcnt;
        xlrec.subxid_overflow = (CurrRunningXacts->subxid_status != SUBXIDS_IN_ARRAY);
index 13ae3ad4fbbb30f66b6ed753b3757b9e54e4d7be..55663e6f4afabf179a67c223c5e4785c59196a32 100644 (file)
@@ -32,7 +32,7 @@
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD11F /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD120 /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
index 8ccdf61246b1595e115bade273bba2a9fc1d14e5..7de0a11540236b66eca5230d486baa71b46db059 100644 (file)
@@ -312,15 +312,6 @@ extern void PreventCommandIfReadOnly(const char *cmdname);
 extern void PreventCommandIfParallelMode(const char *cmdname);
 extern void PreventCommandDuringRecovery(const char *cmdname);
 
-/* in replication/snapbuild.c */
-
-/*
- * Keep track of whether logical decoding in this backend promised not to
- * access shared catalogs, as a safety check.  This is checked by genam.c when
- * a catalog scan takes place to verify that no shared catalogs are accessed.
- */
-extern PGDLLIMPORT bool accessSharedCatalogsInDecoding;
-
 /*****************************************************************************
  *       pdir.h --                                                                                                                              *
  *                     POSTGRES directory path definitions.                             *
index 917f3cff2320a7e164d0476cc1cdc3beca03e9e6..842fcde67f90a77bda1b16fc7f545bc524228ebc 100644 (file)
@@ -27,7 +27,6 @@ typedef struct OutputPluginOptions
 {
        OutputPluginOutputType output_type;
        bool            receive_rewrites;
-       bool            need_shared_catalogs;
 } OutputPluginOptions;
 
 /*
index d02530a912a0c4e0f167218b7551a7a6423a7514..a22a83a2f237c8a33c272b210ef4f2eee7bfa6c3 100644 (file)
@@ -92,8 +92,7 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
                                                                   XLogRecPtr lsn,
                                                                   xl_heap_new_cid *xlrec);
 extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
-                                                                                xl_running_xacts *running,
-                                                                                bool db_specific);
+                                                                                xl_running_xacts *running);
 extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
 
 extern bool SnapBuildSnapshotExists(XLogRecPtr lsn);
index ec89c4482204d52e4e3d3b6697050af5c4b5ddeb..d718a5b542f04fcea84eccea19d0ea0fd64e4ac4 100644 (file)
@@ -47,7 +47,7 @@ extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
                                                                                 VirtualTransactionId *sourcevxid);
 extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
 
-extern RunningTransactions GetRunningTransactionData(Oid dbid);
+extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern TransactionId GetOldestNonRemovableTransactionId(Relation rel);
index 8715c08e94f201c5eaea5042fd3a7400f94a35c2..6a314c693cde733c263077230a5b12b260507063 100644 (file)
@@ -126,7 +126,6 @@ typedef enum
 
 typedef struct RunningTransactionsData
 {
-       Oid                     dbid;                   /* only track xacts in this database */
        int                     xcnt;                   /* # of xact ids in xids[] */
        int                     subxcnt;                /* # of subxact ids in xids[] */
        subxids_array_status subxid_status;
@@ -144,7 +143,7 @@ typedef RunningTransactionsData *RunningTransactions;
 extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
 extern void LogAccessExclusiveLockPrepare(void);
 
-extern XLogRecPtr LogStandbySnapshot(Oid dbid);
+extern XLogRecPtr LogStandbySnapshot(void);
 extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
                                                                        bool relcacheInitFileInval);
 
index e75b70787665e455512642f81981495922e9ef3e..231d251fd51c97d292dfda2fc43b3f1db7f1b2d6 100644 (file)
@@ -46,7 +46,6 @@ typedef struct xl_standby_locks
  */
 typedef struct xl_running_xacts
 {
-       Oid                     dbid;                   /* only track xacts in this database */
        int                     xcnt;                   /* # of xact ids in xids[] */
        int                     subxcnt;                /* # of subxact ids in xids[] */
        bool            subxid_overflow;        /* snapshot overflowed, subxids missing */