From: Álvaro Herrera Date: Tue, 9 Jun 2026 18:12:55 +0000 (+0200) Subject: Disallow direct use of the pgrepack logical decoding plugin X-Git-Url: http://git.ipfire.org/gitweb/?a=commitdiff_plain;h=cd7b204b2df99516a48a738e662d8512e706b519;p=thirdparty%2Fpostgresql.git Disallow direct use of the pgrepack logical decoding plugin Nothing is to be gained from using pgrepack outside of REPACK (CONCURRENTLY), and it leads to assertion failures in assertion-enabled builds, and to crashes due to bogus memory lifetime in production builds. Reject attempts to do that with a clean error report. Clean up the nearby code a tad while at it. The only functional changes in that are that the output_writer_private context is allocated and partially filled by the pgrepack output plugin; and that ->relid therein is now always present (rather than only in assertion-enabled builds). Other than that it's just minor code rearrangement and added comments. Author: Álvaro Herrera Reported-by: Nikita Kalinin Suggested-by: Zhijie Hou Reviewed-by: Antonin Houska (older version) Reviewed-by: Srinath Reddy Sadipiralla (older version) Discussion: https://postgr.es/m/19500-38a02529a69353a5@postgresql.org --- diff --git a/contrib/test_decoding/expected/repack.out b/contrib/test_decoding/expected/repack.out index 6204e620b43..c4ff41be690 100644 --- a/contrib/test_decoding/expected/repack.out +++ b/contrib/test_decoding/expected/repack.out @@ -101,3 +101,14 @@ DETAIL: REPACK (CONCURRENTLY) does not support deferrable primary keys. HINT: Use ALTER TABLE ... REPLICA IDENTITY USING INDEX to designate another index as replica identity. -- clean up DROP TABLE repack_conc_replident, clstrpart; +-- verify that the pgrepack plugin cannot be called directly +CREATE TABLE repack_plugin (a int); +SELECT * FROM pg_create_logical_replication_slot('s_repack', 'pgrepack'); +ERROR: unsupported use of logical decoding plugin "pgrepack" +DETAIL: This plugin can only be used by REPACK (CONCURRENTLY). +CONTEXT: slot "s_repack", output plugin "pgrepack", in the startup callback +INSERT INTO repack_plugin VALUES (1); +SELECT * FROM pg_logical_slot_get_binary_changes('s_repack', NULL, NULL); +ERROR: replication slot "s_repack" does not exist +SELECT pg_drop_replication_slot('s_repack'); +ERROR: replication slot "s_repack" does not exist diff --git a/contrib/test_decoding/sql/repack.sql b/contrib/test_decoding/sql/repack.sql index cea3bd33689..f461f5479f4 100644 --- a/contrib/test_decoding/sql/repack.sql +++ b/contrib/test_decoding/sql/repack.sql @@ -75,3 +75,10 @@ REPACK (CONCURRENTLY) repack_conc_replident; -- clean up DROP TABLE repack_conc_replident, clstrpart; + +-- verify that the pgrepack plugin cannot be called directly +CREATE TABLE repack_plugin (a int); +SELECT * FROM pg_create_logical_replication_slot('s_repack', 'pgrepack'); +INSERT INTO repack_plugin VALUES (1); +SELECT * FROM pg_logical_slot_get_binary_changes('s_repack', NULL, NULL); +SELECT pg_drop_replication_slot('s_repack'); diff --git a/src/backend/commands/repack_worker.c b/src/backend/commands/repack_worker.c index b6b7b604b4f..db9ff057cc6 100644 --- a/src/backend/commands/repack_worker.c +++ b/src/backend/commands/repack_worker.c @@ -28,7 +28,7 @@ #include "tcop/tcopprot.h" #include "utils/memutils.h" -#define REPL_PLUGIN_NAME "pgrepack" +#define PGREPACK_PLUGIN "pgrepack" static void RepackWorkerShutdown(int code, Datum arg); static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid); @@ -197,7 +197,7 @@ repack_setup_logical_decoding(Oid relid) Relation rel; Oid toastrelid; LogicalDecodingContext *ctx; - NameData slotname; + char slotname[NAMEDATALEN]; RepackDecodingState *dstate; MemoryContext oldcxt; @@ -207,43 +207,26 @@ repack_setup_logical_decoding(Oid relid) */ Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny())); - /* - * Make sure we can use logical decoding. - */ + /* Make sure we can use logical decoding */ CheckLogicalDecodingRequirements(true); /* - * A single backend should not execute multiple REPACK commands at a time, - * so use PID to make the slot unique. + * Create the replication slot we'll use, and enable logical decoding in + * case it isn't already on. * - * RS_TEMPORARY so that the slot gets cleaned up on ERROR. + * Make the slot RS_TEMPORARY so that it's removed on ERROR. A backend + * cannot execute multiple REPACK commands at a time, so the PID is enough + * to make the slot name unique. */ - snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid); - ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true, + snprintf(slotname, NAMEDATALEN, "pg_repack_%d", MyProcPid); + ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true, false, false); - EnsureLogicalDecodingEnabled(); /* - * Neither prepare_write nor do_write callback nor update_progress is - * useful for us. - */ - ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME, - NIL, - true, - true, - InvalidXLogRecPtr, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), - NULL, NULL, NULL); - - /* - * We don't have control on setting fast_forward, so at least check it. + * Set up repacked_rel_locator and repacked_rel_toast_locator, which we + * use to skip decoding of unrelated relations. */ - Assert(!ctx->fast_forward); - - /* Avoid logical decoding of other relations. */ rel = table_open(relid, AccessShareLock); repacked_rel_locator = rel->rd_locator; toastrelid = rel->rd_rel->reltoastrelid; @@ -258,15 +241,35 @@ repack_setup_logical_decoding(Oid relid) } table_close(rel, AccessShareLock); - DecodingContextFindStartpoint(ctx); - /* - * decode_concurrent_changes() needs non-blocking callback. + * Set up our logical decoding context. We initially use the blocking + * read_local_xlog_page until we find the start point, and switch to the + * non-blocking interface afterwards. */ - ctx->reader->routine.page_read = read_local_xlog_page_no_wait; + ctx = CreateInitDecodingContext(PGREPACK_PLUGIN, + NIL, + true, + true, + InvalidXLogRecPtr, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); - /* Some WAL records should have been read. */ - Assert(XLogRecPtrIsValid(ctx->reader->EndRecPtr)); + /* Complete setup of output_writer_private */ + dstate = (RepackDecodingState *) ctx->output_writer_private; + dstate->relid = relid; + dstate->worker_cxt = CurrentMemoryContext; + dstate->worker_resowner = CurrentResourceOwner; + + /* We don't have control on fast_forward, but verify it's sane */ + Assert(!ctx->fast_forward); + + /* Find our decoding starting point. */ + DecodingContextFindStartpoint(ctx); + + /* From this point on, we need non-blocking WAL reads */ + ctx->reader->routine.page_read = read_local_xlog_page_no_wait; /* * Initialize repack_current_segment so that we can notice WAL segment @@ -275,36 +278,15 @@ repack_setup_logical_decoding(Oid relid) XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment, wal_segment_size); - /* Our private state belongs to the decoding context. */ - oldcxt = MemoryContextSwitchTo(ctx->context); - /* - * read_local_xlog_page_no_wait() needs to be able to indicate the end of - * WAL. + * Set up our reader private state to let the page-read callback notify + * when end-of-WAL has been reached. This lives in the same context as + * the logical decoding itself. */ + oldcxt = MemoryContextSwitchTo(ctx->context); ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate); - dstate = palloc0_object(RepackDecodingState); MemoryContextSwitchTo(oldcxt); -#ifdef USE_ASSERT_CHECKING - dstate->relid = relid; -#endif - - dstate->change_cxt = AllocSetContextCreate(ctx->context, - "REPACK - change", - ALLOCSET_DEFAULT_SIZES); - - /* The file will be set as soon as we have it opened. */ - dstate->file = NULL; - - /* - * Memory context and resource owner for long-lived resources. - */ - dstate->worker_cxt = CurrentMemoryContext; - dstate->worker_resowner = CurrentResourceOwner; - - ctx->output_writer_private = dstate; - return ctx; } diff --git a/src/backend/replication/pgrepack/pgrepack.c b/src/backend/replication/pgrepack/pgrepack.c index a2615ce54c1..959551f5724 100644 --- a/src/backend/replication/pgrepack/pgrepack.c +++ b/src/backend/replication/pgrepack/pgrepack.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/detoast.h" +#include "commands/repack.h" #include "commands/repack_internal.h" #include "replication/snapbuild.h" #include "utils/memutils.h" @@ -47,7 +48,24 @@ static void repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { - ctx->output_plugin_private = NULL; + RepackDecodingState *dstate; + + if (!AmRepackWorker()) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unsupported use of logical decoding plugin \"%s\"", + "pgrepack"), + errdetail("This plugin can only be used by %s.", + "REPACK (CONCURRENTLY)")); + + /* Initial setup of our private state */ + Assert(CurrentMemoryContext == ctx->context); + dstate = palloc0_object(RepackDecodingState); + dstate->change_cxt = AllocSetContextCreate(ctx->context, + "REPACK - change", + ALLOCSET_DEFAULT_SIZES); + /* repack_setup_logical_decoding fills in the rest */ + ctx->output_writer_private = dstate; /* Probably unnecessary, as we don't use the SQL interface ... */ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; diff --git a/src/include/commands/repack_internal.h b/src/include/commands/repack_internal.h index 6a85cee8910..42111aa4ae3 100644 --- a/src/include/commands/repack_internal.h +++ b/src/include/commands/repack_internal.h @@ -39,10 +39,8 @@ typedef char ConcurrentChangeKind; */ typedef struct RepackDecodingState { -#ifdef USE_ASSERT_CHECKING /* The relation whose changes we're decoding. */ Oid relid; -#endif /* Per-change memory context. */ MemoryContext change_cxt;