From e76d8c749c3152657711ed733f0aea61c0e36a91 Mon Sep 17 00:00:00 2001 From: =?utf8?q?=C3=81lvaro=20Herrera?= Date: Tue, 7 Apr 2026 16:55:29 +0200 Subject: [PATCH] Reserve replication slots specifically for REPACK MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Add a new GUC max_repack_replication_slots, which lets the user reserve some additional replication slots for concurrent repack (and only concurrent repack). With this, the user doesn't have to worry about changing the max_replication_slots in order to cater for use of concurrent repack. (We still use the same pool of bgworkers though, but that's less commonly a problem than slots.) Author: Álvaro Herrera Reviewed-by: Srinath Reddy Sadipiralla Discussion: https://postgr.es/m/202604012148.nnnmyxxrr6nh@alvherre.pgsql --- doc/src/sgml/config.sgml | 15 +++ doc/src/sgml/ref/repack.sgml | 6 +- src/backend/commands/repack_worker.c | 7 +- src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/logical/logical.c | 8 +- .../replication/logical/logicalfuncs.c | 2 +- src/backend/replication/logical/slotsync.c | 5 +- src/backend/replication/slot.c | 92 ++++++++++++------- src/backend/replication/slotfuncs.c | 19 ++-- src/backend/replication/walsender.c | 9 +- src/backend/utils/misc/guc_parameters.dat | 8 ++ src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/replication/logical.h | 3 +- src/include/replication/slot.h | 5 +- 14 files changed, 121 insertions(+), 62 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 3324d2d3c49..584bc9f49dd 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4639,6 +4639,21 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + max_repack_replication_slots (integer) + + max_repack_replication_slots configuration parameter + + + + + Specifies the maximum number of replication slots for use of + the REPACK command. The default is 5. + This parameter can only be set at server start. + + + + max_replication_slots (integer) diff --git a/doc/src/sgml/ref/repack.sgml b/doc/src/sgml/ref/repack.sgml index ac020406d11..0cb72b6b289 100644 --- a/doc/src/sgml/ref/repack.sgml +++ b/doc/src/sgml/ref/repack.sgml @@ -293,9 +293,9 @@ REPACK [ ( option [, ...] ) ] USING - The max_replication_slots - configuration parameter does not allow for creation of an additional - replication slot. + The max_repack_replication_slots + configuration parameter does not allow for the creation of an + additional replication slot. diff --git a/src/backend/commands/repack_worker.c b/src/backend/commands/repack_worker.c index d06bc1dd270..5bd020e0184 100644 --- a/src/backend/commands/repack_worker.c +++ b/src/backend/commands/repack_worker.c @@ -212,7 +212,7 @@ repack_setup_logical_decoding(Oid relid) * Make sure we can use logical decoding. */ CheckSlotPermissions(); - CheckLogicalDecodingRequirements(); + CheckLogicalDecodingRequirements(true); /* * A single backend should not execute multiple REPACK commands at a time, @@ -221,8 +221,8 @@ repack_setup_logical_decoding(Oid relid) * RS_TEMPORARY so that the slot gets cleaned up on ERROR. */ snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid); - ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false, - false); + ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true, + false, false); EnsureLogicalDecodingEnabled(); @@ -233,6 +233,7 @@ repack_setup_logical_decoding(Oid relid) ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME, NIL, true, + true, InvalidXLogRecPtr, XL_ROUTINE(.page_read = read_local_xlog_page, .segment_open = wal_segment_open, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 9e75a3e04ee..7adf4dbe0d1 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1575,7 +1575,7 @@ CreateConflictDetectionSlot(void) errmsg("creating replication conflict detection slot")); ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, - false, false); + false, false, false); init_conflict_slot_xmin(); } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8ceaf64d164..d8e02c53558 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -108,9 +108,9 @@ static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugi * decoding. */ void -CheckLogicalDecodingRequirements(void) +CheckLogicalDecodingRequirements(bool repack) { - CheckSlotRequirements(); + CheckSlotRequirements(repack); /* * NB: Adding a new requirement likely means that RestoreSlotFromDisk() @@ -304,6 +304,7 @@ StartupDecodingContext(List *output_plugin_options, * output_plugin_options -- contains options passed to the output plugin * need_full_snapshot -- if true, must obtain a snapshot able to read all * tables; if false, one that can read only catalogs is acceptable. + * for_repack -- if true, we're going to be decoding for REPACK. * restart_lsn -- if given as invalid, it's this routine's responsibility to * mark WAL as reserved by setting a convenient restart_lsn for the slot. * Otherwise, we set for decoding to start from the given LSN without @@ -324,6 +325,7 @@ LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, + bool for_repack, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, @@ -340,7 +342,7 @@ CreateInitDecodingContext(const char *plugin, * On a standby, this check is also required while creating the slot. * Check the comments in the function. */ - CheckLogicalDecodingRequirements(); + CheckLogicalDecodingRequirements(for_repack); /* shorter lines... */ slot = MyReplicationSlot; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 9760818941d..512013b0ef0 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -115,7 +115,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin CheckSlotPermissions(); - CheckLogicalDecodingRequirements(); + CheckLogicalDecodingRequirements(false); if (PG_ARGISNULL(0)) ereport(ERROR, diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8b53bd3ac7f..ae900f13467 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -434,7 +434,7 @@ get_local_synced_slots(void) LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (int i = 0; i < max_replication_slots; i++) + for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; @@ -823,6 +823,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, */ ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase, + false, remote_slot->failover, true); @@ -1707,7 +1708,7 @@ update_synced_slots_inactive_since(void) LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (int i = 0; i < max_replication_slots; i++) + for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index a1f37e59dbc..83fcde74718 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -160,6 +160,8 @@ ReplicationSlot *MyReplicationSlot = NULL; /* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +int max_repack_replication_slots = 5; /* the maximum number of slots + * for REPACK */ /* * Invalidate replication slots that have remained idle longer than this @@ -199,12 +201,13 @@ ReplicationSlotsShmemRequest(void *arg) { Size size; - if (max_replication_slots == 0) + if (max_replication_slots + max_repack_replication_slots == 0) return; size = offsetof(ReplicationSlotCtlData, replication_slots); size = add_size(size, - mul_size(max_replication_slots, sizeof(ReplicationSlot))); + mul_size(max_replication_slots + max_repack_replication_slots, + sizeof(ReplicationSlot))); ShmemRequestStruct(.name = "ReplicationSlot Ctl", .size = size, .ptr = (void **) &ReplicationSlotCtl, @@ -217,7 +220,7 @@ ReplicationSlotsShmemRequest(void *arg) static void ReplicationSlotsShmemInit(void *arg) { - for (int i = 0; i < max_replication_slots; i++) + for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i]; @@ -366,6 +369,7 @@ IsSlotForConflictCheck(const char *name) * db_specific: logical decoding is db specific; if the slot is going to * be used for that pass true, otherwise false. * two_phase: If enabled, allows decoding of prepared transactions. + * repack: If true, use a slot from the pool for REPACK. * failover: If enabled, allows the slot to be synced to standbys so * that logical replication can be resumed after failover. * synced: True if the slot is synchronized from the primary server. @@ -373,10 +377,11 @@ IsSlotForConflictCheck(const char *name) void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, bool synced) + bool two_phase, bool repack, bool failover, bool synced) { ReplicationSlot *slot = NULL; - int i; + int startpoint, + endpoint; Assert(MyReplicationSlot == NULL); @@ -425,12 +430,16 @@ ReplicationSlotCreate(const char *name, bool db_specific, LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); /* - * Check for name collision, and identify an allocatable slot. We need to - * hold ReplicationSlotControlLock in shared mode for this, so that nobody - * else can change the in_use flags while we're looking at them. + * Check for name collision (across the whole array), and identify an + * allocatable slot (in the array slice specific to our current use case: + * either general, or REPACK only). We need to hold + * ReplicationSlotControlLock in shared mode for this, so that nobody else + * can change the in_use flags while we're looking at them. */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + startpoint = !repack ? 0 : max_replication_slots; + endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0); + for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; @@ -438,7 +447,9 @@ ReplicationSlotCreate(const char *name, bool db_specific, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("replication slot \"%s\" already exists", name))); - if (!s->in_use && slot == NULL) + + if (i >= startpoint && i < endpoint && + !s->in_use && slot == NULL) slot = s; } LWLockRelease(ReplicationSlotControlLock); @@ -448,7 +459,8 @@ ReplicationSlotCreate(const char *name, bool db_specific, ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("all replication slots are in use"), - errhint("Free one or increase \"max_replication_slots\"."))); + errhint("Free one or increase \"%s\".", + repack ? "max_repack_replication_slots" : "max_replication_slots"))); /* * Since this slot is not in use, nobody should be looking at any part of @@ -541,7 +553,7 @@ SearchNamedReplicationSlot(const char *name, bool need_lock) if (need_lock) LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; @@ -569,7 +581,8 @@ int ReplicationSlotIndex(ReplicationSlot *slot) { Assert(slot >= ReplicationSlotCtl->replication_slots && - slot < ReplicationSlotCtl->replication_slots + max_replication_slots); + slot < ReplicationSlotCtl->replication_slots + + (max_replication_slots + max_repack_replication_slots)); return slot - ReplicationSlotCtl->replication_slots; } @@ -863,7 +876,7 @@ ReplicationSlotCleanup(bool synced_only) restart: found_valid_logicalslot = false; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; @@ -1245,7 +1258,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) if (!already_locked) LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; TransactionId effective_xmin; @@ -1300,7 +1313,7 @@ ReplicationSlotsComputeRequiredLSN(void) Assert(ReplicationSlotCtl != NULL); LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn; @@ -1367,12 +1380,12 @@ ReplicationSlotsComputeLogicalRestartLSN(void) XLogRecPtr result = InvalidXLogRecPtr; int i; - if (max_replication_slots <= 0) + if (max_replication_slots + max_repack_replication_slots <= 0) return InvalidXLogRecPtr; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s; XLogRecPtr restart_lsn; @@ -1447,11 +1460,11 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) *nslots = *nactive = 0; - if (max_replication_slots <= 0) + if (max_replication_slots + max_repack_replication_slots <= 0) return false; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s; @@ -1508,13 +1521,13 @@ ReplicationSlotsDropDBSlots(Oid dboid) bool found_valid_logicalslot; bool dropped = false; - if (max_replication_slots <= 0) + if (max_replication_slots + max_repack_replication_slots <= 0) return; restart: found_valid_logicalslot = false; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s; char *slotname; @@ -1611,11 +1624,11 @@ CheckLogicalSlotExists(void) { bool found = false; - if (max_replication_slots <= 0) + if (max_replication_slots + max_repack_replication_slots <= 0) return false; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (int i = 0; i < max_replication_slots; i++) + for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s; bool invalidated; @@ -1649,17 +1662,24 @@ CheckLogicalSlotExists(void) * slots. */ void -CheckSlotRequirements(void) +CheckSlotRequirements(bool repack) { /* * NB: Adding a new requirement likely means that RestoreSlotFromDisk() * needs the same check. */ - if (max_replication_slots == 0) + if (!repack && max_replication_slots == 0) ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("replication slots can only be used if \"max_replication_slots\" > 0"))); + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slots can only be used if \"%s\" > 0", + "max_replication_slots")); + + if (repack && max_repack_replication_slots == 0) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("REPACK can only be used if \"%s\" > 0", + "max_repack_replication_slots")); if (wal_level < WAL_LEVEL_REPLICA) ereport(ERROR, @@ -2210,7 +2230,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes, Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0); Assert(possible_causes != RS_INVAL_NONE); - if (max_replication_slots == 0) + if (max_replication_slots == 0 && max_repack_replication_slots == 0) return invalidated; XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); @@ -2218,7 +2238,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes, restart: found_valid_logicalslot = false; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (int i = 0; i < max_replication_slots; i++) + for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; bool released_lock = false; @@ -2323,7 +2343,7 @@ CheckPointReplicationSlots(bool is_shutdown) */ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) + for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; char path[MAXPGPATH]; @@ -2424,7 +2444,7 @@ StartupReplicationSlots(void) FreeDir(replication_dir); /* currently no slots exist, we're done. */ - if (max_replication_slots <= 0) + if (max_replication_slots + max_repack_replication_slots <= 0) return; /* Now that we have recovered all the data, compute replication xmin */ @@ -2853,7 +2873,13 @@ RestoreSlotFromDisk(const char *name) NameStr(cp.slotdata.name)), errhint("Change \"wal_level\" to be \"replica\" or higher."))); - /* nothing can be active yet, don't lock anything */ + /* + * Nothing can be active yet, don't lock anything. Note we iterate up to + * max_replication_slots instead of adding max_repack_replication_slots as + * in all other places, because we must enforce the GUC value in case + * there were more slots before the shutdown than what it is set up to + * now. + */ for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *slot; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 9f5e4f998fe..16fbd383735 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -53,7 +53,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false, false); + false, false, false); if (immediately_reserve) { @@ -90,7 +90,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) CheckSlotPermissions(); - CheckSlotRequirements(); + CheckSlotRequirements(false); create_physical_replication_slot(NameStr(*name), immediately_reserve, @@ -146,7 +146,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover, false); + false, failover, false); /* * Ensure the logical decoding is enabled before initializing the logical @@ -164,6 +164,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ctx = CreateInitDecodingContext(plugin, NIL, false, /* just catalogs is OK */ + false, /* not repack */ restart_lsn, XL_ROUTINE(.page_read = read_local_xlog_page, .segment_open = wal_segment_open, @@ -203,7 +204,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) CheckSlotPermissions(); - CheckLogicalDecodingRequirements(); + CheckLogicalDecodingRequirements(false); create_logical_replication_slot(NameStr(*name), NameStr(*plugin), @@ -240,7 +241,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotPermissions(); - CheckSlotRequirements(); + CheckSlotRequirements(false); ReplicationSlotDrop(NameStr(*name), true); @@ -270,7 +271,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) currlsn = GetXLogWriteRecPtr(); LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (slotno = 0; slotno < max_replication_slots; slotno++) + for (slotno = 0; slotno < max_replication_slots + max_repack_replication_slots; slotno++) { ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; ReplicationSlot slot_contents; @@ -648,9 +649,9 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) CheckSlotPermissions(); if (logical_slot) - CheckLogicalDecodingRequirements(); + CheckLogicalDecodingRequirements(false); else - CheckSlotRequirements(); + CheckSlotRequirements(false); LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); @@ -665,7 +666,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) * managed to create the new slot, we advance the new slot's restart_lsn * to the source slot's updated restart_lsn the second time we lock it. */ - for (int i = 0; i < max_replication_slots; i++) + for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index b4a2117a7f9..bad45adb004 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1241,7 +1241,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false, false); + false, false, false, false); if (reserve_wal) { @@ -1261,7 +1261,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(cmd->kind == REPLICATION_KIND_LOGICAL); - CheckLogicalDecodingRequirements(); + CheckLogicalDecodingRequirements(false); /* * Initially create persistent slot as ephemeral - that allows us to @@ -1272,7 +1272,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover, false); + two_phase, false, failover, false); /* * Do options check early so that we can bail before calling the @@ -1330,6 +1330,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(IsLogicalDecodingEnabled()); ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, + false, InvalidXLogRecPtr, XL_ROUTINE(.page_read = logical_read_xlog_page, .segment_open = WalSndSegmentOpen, @@ -1487,7 +1488,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) QueryCompletion qc; /* make sure that our requirements are still fulfilled */ - CheckLogicalDecodingRequirements(); + CheckLogicalDecodingRequirements(false); Assert(!MyReplicationSlot); diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index fcb6ab80583..632f3ba4989 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2079,6 +2079,14 @@ max => 'MAX_BACKENDS', }, +{ name => 'max_repack_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING', + short_desc => 'Sets the maximum number of replication slots for use by REPACK.', + variable => 'max_repack_replication_slots', + boot_val => '5', + min => '0', + max => 'MAX_BACKENDS', +}, + /* see max_wal_senders */ { name => 'max_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING', short_desc => 'Sets the maximum number of simultaneously defined replication slots.', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index e3e462f3efb..2e10eb4a36a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -348,6 +348,8 @@ # (change requires restart) #max_replication_slots = 10 # max number of replication slots # (change requires restart) +#max_repack_replication_slots = 5 # max number of replication slots for REPACK + # (change requires restart) #wal_keep_size = 0 # in megabytes; 0 disables #max_slot_wal_keep_size = -1 # in megabytes; -1 disables #idle_replication_slot_timeout = 0 # in seconds; 0 disables diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index bc9d4ece672..bc075b16741 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -115,11 +115,12 @@ typedef struct LogicalDecodingContext } LogicalDecodingContext; -extern void CheckLogicalDecodingRequirements(void); +extern void CheckLogicalDecodingRequirements(bool repack); extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, + bool for_repack, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1a3557de607..77c8d0975b6 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -324,13 +324,14 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT int max_repack_replication_slots; extern PGDLLIMPORT char *synchronized_standby_slots; extern PGDLLIMPORT int idle_replication_slot_timeout_secs; /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, + bool two_phase, bool repack, bool failover, bool synced); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); @@ -373,7 +374,7 @@ extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(bool is_shutdown); -extern void CheckSlotRequirements(void); +extern void CheckSlotRequirements(bool repack); extern void CheckSlotPermissions(void); extern ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name); -- 2.47.3