]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Disallow direct use of the pgrepack logical decoding plugin
authorÁlvaro Herrera <alvherre@kurilemu.de>
Tue, 9 Jun 2026 18:12:55 +0000 (20:12 +0200)
committerÁlvaro Herrera <alvherre@kurilemu.de>
Tue, 9 Jun 2026 18:12:55 +0000 (20:12 +0200)
Nothing is to be gained from using pgrepack outside of REPACK
(CONCURRENTLY), and it leads to assertion failures in assertion-enabled
builds, and to crashes due to bogus memory lifetime in production
builds.  Reject attempts to do that with a clean error report.

Clean up the nearby code a tad while at it.  The only functional changes
in that are that the output_writer_private context is allocated and
partially filled by the pgrepack output plugin; and that ->relid therein
is now always present (rather than only in assertion-enabled builds).
Other than that it's just minor code rearrangement and added comments.

Author: Álvaro Herrera <alvherre@kurilemu.de>
Reported-by: Nikita Kalinin <n.kalinin@postgrespro.ru>
Suggested-by: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: Antonin Houska <ah@cybertec.at> (older version)
Reviewed-by: Srinath Reddy Sadipiralla <srinath2133@gmail.com> (older version)
Discussion: https://postgr.es/m/19500-38a02529a69353a5@postgresql.org

contrib/test_decoding/expected/repack.out
contrib/test_decoding/sql/repack.sql
src/backend/commands/repack_worker.c
src/backend/replication/pgrepack/pgrepack.c
src/include/commands/repack_internal.h

index 6204e620b43eac538c2b12e32703d603a591dd38..c4ff41be6901b18ada036e81196a23b52ba9c29c 100644 (file)
@@ -101,3 +101,14 @@ DETAIL:  REPACK (CONCURRENTLY) does not support deferrable primary keys.
 HINT:  Use ALTER TABLE ... REPLICA IDENTITY USING INDEX to designate another index as replica identity.
 -- clean up
 DROP TABLE repack_conc_replident, clstrpart;
+-- verify that the pgrepack plugin cannot be called directly
+CREATE TABLE repack_plugin (a int);
+SELECT * FROM pg_create_logical_replication_slot('s_repack', 'pgrepack');
+ERROR:  unsupported use of logical decoding plugin "pgrepack"
+DETAIL:  This plugin can only be used by REPACK (CONCURRENTLY).
+CONTEXT:  slot "s_repack", output plugin "pgrepack", in the startup callback
+INSERT INTO repack_plugin VALUES (1);
+SELECT * FROM pg_logical_slot_get_binary_changes('s_repack', NULL, NULL);
+ERROR:  replication slot "s_repack" does not exist
+SELECT pg_drop_replication_slot('s_repack');
+ERROR:  replication slot "s_repack" does not exist
index cea3bd336895da7308be6c9d50a7749dc4ab22d4..f461f5479f4badefe35f8666b37f25ba0b7f6df6 100644 (file)
@@ -75,3 +75,10 @@ REPACK (CONCURRENTLY) repack_conc_replident;
 
 -- clean up
 DROP TABLE repack_conc_replident, clstrpart;
+
+-- verify that the pgrepack plugin cannot be called directly
+CREATE TABLE repack_plugin (a int);
+SELECT * FROM pg_create_logical_replication_slot('s_repack', 'pgrepack');
+INSERT INTO repack_plugin VALUES (1);
+SELECT * FROM pg_logical_slot_get_binary_changes('s_repack', NULL, NULL);
+SELECT pg_drop_replication_slot('s_repack');
index b6b7b604b4fe2057251f02f675a2e136b22fa39a..db9ff057cc6b310d230788daeae267a54a0d5999 100644 (file)
@@ -28,7 +28,7 @@
 #include "tcop/tcopprot.h"
 #include "utils/memutils.h"
 
-#define REPL_PLUGIN_NAME   "pgrepack"
+#define PGREPACK_PLUGIN   "pgrepack"
 
 static void RepackWorkerShutdown(int code, Datum arg);
 static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
@@ -197,7 +197,7 @@ repack_setup_logical_decoding(Oid relid)
        Relation        rel;
        Oid                     toastrelid;
        LogicalDecodingContext *ctx;
-       NameData        slotname;
+       char            slotname[NAMEDATALEN];
        RepackDecodingState *dstate;
        MemoryContext oldcxt;
 
@@ -207,43 +207,26 @@ repack_setup_logical_decoding(Oid relid)
         */
        Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
 
-       /*
-        * Make sure we can use logical decoding.
-        */
+       /* Make sure we can use logical decoding */
        CheckLogicalDecodingRequirements(true);
 
        /*
-        * A single backend should not execute multiple REPACK commands at a time,
-        * so use PID to make the slot unique.
+        * Create the replication slot we'll use, and enable logical decoding in
+        * case it isn't already on.
         *
-        * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
+        * Make the slot RS_TEMPORARY so that it's removed on ERROR.  A backend
+        * cannot execute multiple REPACK commands at a time, so the PID is enough
+        * to make the slot name unique.
         */
-       snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
-       ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
+       snprintf(slotname, NAMEDATALEN, "pg_repack_%d", MyProcPid);
+       ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true,
                                                  false, false);
-
        EnsureLogicalDecodingEnabled();
 
        /*
-        * Neither prepare_write nor do_write callback nor update_progress is
-        * useful for us.
-        */
-       ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
-                                                                       NIL,
-                                                                       true,
-                                                                       true,
-                                                                       InvalidXLogRecPtr,
-                                                                       XL_ROUTINE(.page_read = read_local_xlog_page,
-                                                                                          .segment_open = wal_segment_open,
-                                                                                          .segment_close = wal_segment_close),
-                                                                       NULL, NULL, NULL);
-
-       /*
-        * We don't have control on setting fast_forward, so at least check it.
+        * Set up repacked_rel_locator and repacked_rel_toast_locator, which we
+        * use to skip decoding of unrelated relations.
         */
-       Assert(!ctx->fast_forward);
-
-       /* Avoid logical decoding of other relations. */
        rel = table_open(relid, AccessShareLock);
        repacked_rel_locator = rel->rd_locator;
        toastrelid = rel->rd_rel->reltoastrelid;
@@ -258,15 +241,35 @@ repack_setup_logical_decoding(Oid relid)
        }
        table_close(rel, AccessShareLock);
 
-       DecodingContextFindStartpoint(ctx);
-
        /*
-        * decode_concurrent_changes() needs non-blocking callback.
+        * Set up our logical decoding context.  We initially use the blocking
+        * read_local_xlog_page until we find the start point, and switch to the
+        * non-blocking interface afterwards.
         */
-       ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
+       ctx = CreateInitDecodingContext(PGREPACK_PLUGIN,
+                                                                       NIL,
+                                                                       true,
+                                                                       true,
+                                                                       InvalidXLogRecPtr,
+                                                                       XL_ROUTINE(.page_read = read_local_xlog_page,
+                                                                                          .segment_open = wal_segment_open,
+                                                                                          .segment_close = wal_segment_close),
+                                                                       NULL, NULL, NULL);
 
-       /* Some WAL records should have been read. */
-       Assert(XLogRecPtrIsValid(ctx->reader->EndRecPtr));
+       /* Complete setup of output_writer_private */
+       dstate = (RepackDecodingState *) ctx->output_writer_private;
+       dstate->relid = relid;
+       dstate->worker_cxt = CurrentMemoryContext;
+       dstate->worker_resowner = CurrentResourceOwner;
+
+       /* We don't have control on fast_forward, but verify it's sane */
+       Assert(!ctx->fast_forward);
+
+       /* Find our decoding starting point. */
+       DecodingContextFindStartpoint(ctx);
+
+       /* From this point on, we need non-blocking WAL reads */
+       ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
 
        /*
         * Initialize repack_current_segment so that we can notice WAL segment
@@ -275,36 +278,15 @@ repack_setup_logical_decoding(Oid relid)
        XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
                                wal_segment_size);
 
-       /* Our private state belongs to the decoding context. */
-       oldcxt = MemoryContextSwitchTo(ctx->context);
-
        /*
-        * read_local_xlog_page_no_wait() needs to be able to indicate the end of
-        * WAL.
+        * Set up our reader private state to let the page-read callback notify
+        * when end-of-WAL has been reached.  This lives in the same context as
+        * the logical decoding itself.
         */
+       oldcxt = MemoryContextSwitchTo(ctx->context);
        ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
-       dstate = palloc0_object(RepackDecodingState);
        MemoryContextSwitchTo(oldcxt);
 
-#ifdef USE_ASSERT_CHECKING
-       dstate->relid = relid;
-#endif
-
-       dstate->change_cxt = AllocSetContextCreate(ctx->context,
-                                                                                          "REPACK - change",
-                                                                                          ALLOCSET_DEFAULT_SIZES);
-
-       /* The file will be set as soon as we have it opened. */
-       dstate->file = NULL;
-
-       /*
-        * Memory context and resource owner for long-lived resources.
-        */
-       dstate->worker_cxt = CurrentMemoryContext;
-       dstate->worker_resowner = CurrentResourceOwner;
-
-       ctx->output_writer_private = dstate;
-
        return ctx;
 }
 
index a2615ce54c1e3b7427b01ea3add3136b7af5ac3e..959551f5724d1f9f7fd61312b98e85cd13f1c188 100644 (file)
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/detoast.h"
+#include "commands/repack.h"
 #include "commands/repack_internal.h"
 #include "replication/snapbuild.h"
 #include "utils/memutils.h"
@@ -47,7 +48,24 @@ static void
 repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                           bool is_init)
 {
-       ctx->output_plugin_private = NULL;
+       RepackDecodingState *dstate;
+
+       if (!AmRepackWorker())
+               ereport(ERROR,
+                               errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                               errmsg("unsupported use of logical decoding plugin \"%s\"",
+                                          "pgrepack"),
+                               errdetail("This plugin can only be used by %s.",
+                                                 "REPACK (CONCURRENTLY)"));
+
+       /* Initial setup of our private state */
+       Assert(CurrentMemoryContext == ctx->context);
+       dstate = palloc0_object(RepackDecodingState);
+       dstate->change_cxt = AllocSetContextCreate(ctx->context,
+                                                                                          "REPACK - change",
+                                                                                          ALLOCSET_DEFAULT_SIZES);
+       /* repack_setup_logical_decoding fills in the rest */
+       ctx->output_writer_private = dstate;
 
        /* Probably unnecessary, as we don't use the SQL interface ... */
        opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
index 6a85cee8910123a6207b24f0ce8591c7ca35d217..42111aa4ae3313b2bf138b7564a235d404253b57 100644 (file)
@@ -39,10 +39,8 @@ typedef char ConcurrentChangeKind;
  */
 typedef struct RepackDecodingState
 {
-#ifdef USE_ASSERT_CHECKING
        /* The relation whose changes we're decoding. */
        Oid                     relid;
-#endif
 
        /* Per-change memory context. */
        MemoryContext change_cxt;