]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Reserve replication slots specifically for REPACK
authorÁlvaro Herrera <alvherre@kurilemu.de>
Tue, 7 Apr 2026 14:55:29 +0000 (16:55 +0200)
committerÁlvaro Herrera <alvherre@kurilemu.de>
Tue, 7 Apr 2026 14:55:29 +0000 (16:55 +0200)
Add a new GUC max_repack_replication_slots, which lets the user reserve
some additional replication slots for concurrent repack (and only
concurrent repack).  With this, the user doesn't have to worry about
changing the max_replication_slots in order to cater for use of
concurrent repack.

(We still use the same pool of bgworkers though, but that's less
commonly a problem than slots.)

Author: Álvaro Herrera <alvherre@kurilemu.de>
Reviewed-by: Srinath Reddy Sadipiralla <srinath2133@gmail.com>
Discussion: https://postgr.es/m/202604012148.nnnmyxxrr6nh@alvherre.pgsql

14 files changed:
doc/src/sgml/config.sgml
doc/src/sgml/ref/repack.sgml
src/backend/commands/repack_worker.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/logical/slotsync.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/backend/utils/misc/guc_parameters.dat
src/backend/utils/misc/postgresql.conf.sample
src/include/replication/logical.h
src/include/replication/slot.h

index 3324d2d3c49e1ef3509908d1823131dca5ff85ae..584bc9f49ddbed582ee7190aaa6d48f955fec3b2 100644 (file)
@@ -4639,6 +4639,21 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-max-repack-replication-slots" xreflabel="max_repack_replication_slots">
+       <term><varname>max_repack_replication_slots</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_repack_replication_slots</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Specifies the maximum number of replication slots for use of
+         the <command>REPACK</command> command.  The default is 5.
+         This parameter can only be set at server start.
+        </para>
+       </listitem>
+      </varlistentry>
+
       <varlistentry id="guc-max-replication-slots" xreflabel="max_replication_slots">
        <term><varname>max_replication_slots</varname> (<type>integer</type>)
        <indexterm>
index ac020406d11436325060a480e6d5b9d26970a291..0cb72b6b2899493236be8ae6b3e0fdeee393f629 100644 (file)
@@ -293,9 +293,9 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING
 
        <listitem>
         <para>
-         The <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
-         configuration parameter does not allow for creation of an additional
-         replication slot.
+         The <link linkend="guc-max-repack-replication-slots"><varname>max_repack_replication_slots</varname></link>
+         configuration parameter does not allow for the creation of an
+         additional replication slot.
         </para>
        </listitem>
       </itemizedlist>
index d06bc1dd270cbaf9f4ab2ccf638e17894f2ce73c..5bd020e018439c9176f421dba13e48a20f9d5d75 100644 (file)
@@ -212,7 +212,7 @@ repack_setup_logical_decoding(Oid relid)
         * Make sure we can use logical decoding.
         */
        CheckSlotPermissions();
-       CheckLogicalDecodingRequirements();
+       CheckLogicalDecodingRequirements(true);
 
        /*
         * A single backend should not execute multiple REPACK commands at a time,
@@ -221,8 +221,8 @@ repack_setup_logical_decoding(Oid relid)
         * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
         */
        snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
-       ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false,
-                                                 false);
+       ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
+                                                 false, false);
 
        EnsureLogicalDecodingEnabled();
 
@@ -233,6 +233,7 @@ repack_setup_logical_decoding(Oid relid)
        ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
                                                                        NIL,
                                                                        true,
+                                                                       true,
                                                                        InvalidXLogRecPtr,
                                                                        XL_ROUTINE(.page_read = read_local_xlog_page,
                                                                                           .segment_open = wal_segment_open,
index 9e75a3e04ee1416319d77c5d67a400a4938d7844..7adf4dbe0d143a2dff518f7cb54275ffb60bcd46 100644 (file)
@@ -1575,7 +1575,7 @@ CreateConflictDetectionSlot(void)
                        errmsg("creating replication conflict detection slot"));
 
        ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
-                                                 false, false);
+                                                 false, false, false);
 
        init_conflict_slot_xmin();
 }
index 8ceaf64d164e78dadfa216fcebb38fa8a411d587..d8e02c535582dda72cc6f18d04caebcdfdd92a48 100644 (file)
@@ -108,9 +108,9 @@ static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugi
  * decoding.
  */
 void
-CheckLogicalDecodingRequirements(void)
+CheckLogicalDecodingRequirements(bool repack)
 {
-       CheckSlotRequirements();
+       CheckSlotRequirements(repack);
 
        /*
         * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
@@ -304,6 +304,7 @@ StartupDecodingContext(List *output_plugin_options,
  * output_plugin_options -- contains options passed to the output plugin
  * need_full_snapshot -- if true, must obtain a snapshot able to read all
  *             tables; if false, one that can read only catalogs is acceptable.
+ * for_repack -- if true, we're going to be decoding for REPACK.
  * restart_lsn -- if given as invalid, it's this routine's responsibility to
  *             mark WAL as reserved by setting a convenient restart_lsn for the slot.
  *             Otherwise, we set for decoding to start from the given LSN without
@@ -324,6 +325,7 @@ LogicalDecodingContext *
 CreateInitDecodingContext(const char *plugin,
                                                  List *output_plugin_options,
                                                  bool need_full_snapshot,
+                                                 bool for_repack,
                                                  XLogRecPtr restart_lsn,
                                                  XLogReaderRoutine *xl_routine,
                                                  LogicalOutputPluginWriterPrepareWrite prepare_write,
@@ -340,7 +342,7 @@ CreateInitDecodingContext(const char *plugin,
         * On a standby, this check is also required while creating the slot.
         * Check the comments in the function.
         */
-       CheckLogicalDecodingRequirements();
+       CheckLogicalDecodingRequirements(for_repack);
 
        /* shorter lines... */
        slot = MyReplicationSlot;
index 9760818941d9deacd1ff0a2315a6533811b50a56..512013b0ef0d1de0a96beaf9fdff1203be0af080 100644 (file)
@@ -115,7 +115,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 
        CheckSlotPermissions();
 
-       CheckLogicalDecodingRequirements();
+       CheckLogicalDecodingRequirements(false);
 
        if (PG_ARGISNULL(0))
                ereport(ERROR,
index 8b53bd3ac7f948b098431149e8275bbdc5d7f006..ae900f13467605f9f3c0a540c965dcdcc17b0bc7 100644 (file)
@@ -434,7 +434,7 @@ get_local_synced_slots(void)
 
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-       for (int i = 0; i < max_replication_slots; i++)
+       for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -823,6 +823,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                 */
                ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
                                                          remote_slot->two_phase,
+                                                         false,
                                                          remote_slot->failover,
                                                          true);
 
@@ -1707,7 +1708,7 @@ update_synced_slots_inactive_since(void)
 
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-       for (int i = 0; i < max_replication_slots; i++)
+       for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
index a1f37e59dbc50fa9221ecbaecba411f83bc8bf62..83fcde74718080ea1135b5614f3d8b0e4132b20e 100644 (file)
@@ -160,6 +160,8 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variables */
 int                    max_replication_slots = 10; /* the maximum number of replication
                                                                                 * slots */
+int                    max_repack_replication_slots = 5;       /* the maximum number of slots
+                                                                                                * for REPACK */
 
 /*
  * Invalidate replication slots that have remained idle longer than this
@@ -199,12 +201,13 @@ ReplicationSlotsShmemRequest(void *arg)
 {
        Size            size;
 
-       if (max_replication_slots == 0)
+       if (max_replication_slots + max_repack_replication_slots == 0)
                return;
 
        size = offsetof(ReplicationSlotCtlData, replication_slots);
        size = add_size(size,
-                                       mul_size(max_replication_slots, sizeof(ReplicationSlot)));
+                                       mul_size(max_replication_slots + max_repack_replication_slots,
+                                                        sizeof(ReplicationSlot)));
        ShmemRequestStruct(.name = "ReplicationSlot Ctl",
                                           .size = size,
                                           .ptr = (void **) &ReplicationSlotCtl,
@@ -217,7 +220,7 @@ ReplicationSlotsShmemRequest(void *arg)
 static void
 ReplicationSlotsShmemInit(void *arg)
 {
-       for (int i = 0; i < max_replication_slots; i++)
+       for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
 
@@ -366,6 +369,7 @@ IsSlotForConflictCheck(const char *name)
  * db_specific: logical decoding is db specific; if the slot is going to
  *        be used for that pass true, otherwise false.
  * two_phase: If enabled, allows decoding of prepared transactions.
+ * repack: If true, use a slot from the pool for REPACK.
  * failover: If enabled, allows the slot to be synced to standbys so
  *     that logical replication can be resumed after failover.
  * synced: True if the slot is synchronized from the primary server.
@@ -373,10 +377,11 @@ IsSlotForConflictCheck(const char *name)
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
                                          ReplicationSlotPersistency persistency,
-                                         bool two_phase, bool failover, bool synced)
+                                         bool two_phase, bool repack, bool failover, bool synced)
 {
        ReplicationSlot *slot = NULL;
-       int                     i;
+       int                     startpoint,
+                               endpoint;
 
        Assert(MyReplicationSlot == NULL);
 
@@ -425,12 +430,16 @@ ReplicationSlotCreate(const char *name, bool db_specific,
        LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
 
        /*
-        * Check for name collision, and identify an allocatable slot.  We need to
-        * hold ReplicationSlotControlLock in shared mode for this, so that nobody
-        * else can change the in_use flags while we're looking at them.
+        * Check for name collision (across the whole array), and identify an
+        * allocatable slot (in the array slice specific to our current use case:
+        * either general, or REPACK only).  We need to hold
+        * ReplicationSlotControlLock in shared mode for this, so that nobody else
+        * can change the in_use flags while we're looking at them.
         */
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       for (i = 0; i < max_replication_slots; i++)
+       startpoint = !repack ? 0 : max_replication_slots;
+       endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
+       for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -438,7 +447,9 @@ ReplicationSlotCreate(const char *name, bool db_specific,
                        ereport(ERROR,
                                        (errcode(ERRCODE_DUPLICATE_OBJECT),
                                         errmsg("replication slot \"%s\" already exists", name)));
-               if (!s->in_use && slot == NULL)
+
+               if (i >= startpoint && i < endpoint &&
+                       !s->in_use && slot == NULL)
                        slot = s;
        }
        LWLockRelease(ReplicationSlotControlLock);
@@ -448,7 +459,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
                ereport(ERROR,
                                (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                 errmsg("all replication slots are in use"),
-                                errhint("Free one or increase \"max_replication_slots\".")));
+                                errhint("Free one or increase \"%s\".",
+                                                repack ? "max_repack_replication_slots" : "max_replication_slots")));
 
        /*
         * Since this slot is not in use, nobody should be looking at any part of
@@ -541,7 +553,7 @@ SearchNamedReplicationSlot(const char *name, bool need_lock)
        if (need_lock)
                LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-       for (i = 0; i < max_replication_slots; i++)
+       for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -569,7 +581,8 @@ int
 ReplicationSlotIndex(ReplicationSlot *slot)
 {
        Assert(slot >= ReplicationSlotCtl->replication_slots &&
-                  slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
+                  slot < ReplicationSlotCtl->replication_slots +
+                  (max_replication_slots + max_repack_replication_slots));
 
        return slot - ReplicationSlotCtl->replication_slots;
 }
@@ -863,7 +876,7 @@ ReplicationSlotCleanup(bool synced_only)
 restart:
        found_valid_logicalslot = false;
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       for (i = 0; i < max_replication_slots; i++)
+       for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -1245,7 +1258,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
        if (!already_locked)
                LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-       for (i = 0; i < max_replication_slots; i++)
+       for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
                TransactionId effective_xmin;
@@ -1300,7 +1313,7 @@ ReplicationSlotsComputeRequiredLSN(void)
        Assert(ReplicationSlotCtl != NULL);
 
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       for (i = 0; i < max_replication_slots; i++)
+       for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
                XLogRecPtr      restart_lsn;
@@ -1367,12 +1380,12 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
        XLogRecPtr      result = InvalidXLogRecPtr;
        int                     i;
 
-       if (max_replication_slots <= 0)
+       if (max_replication_slots + max_repack_replication_slots <= 0)
                return InvalidXLogRecPtr;
 
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-       for (i = 0; i < max_replication_slots; i++)
+       for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s;
                XLogRecPtr      restart_lsn;
@@ -1447,11 +1460,11 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 
        *nslots = *nactive = 0;
 
-       if (max_replication_slots <= 0)
+       if (max_replication_slots + max_repack_replication_slots <= 0)
                return false;
 
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       for (i = 0; i < max_replication_slots; i++)
+       for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s;
 
@@ -1508,13 +1521,13 @@ ReplicationSlotsDropDBSlots(Oid dboid)
        bool            found_valid_logicalslot;
        bool            dropped = false;
 
-       if (max_replication_slots <= 0)
+       if (max_replication_slots + max_repack_replication_slots <= 0)
                return;
 
 restart:
        found_valid_logicalslot = false;
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       for (i = 0; i < max_replication_slots; i++)
+       for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s;
                char       *slotname;
@@ -1611,11 +1624,11 @@ CheckLogicalSlotExists(void)
 {
        bool            found = false;
 
-       if (max_replication_slots <= 0)
+       if (max_replication_slots + max_repack_replication_slots <= 0)
                return false;
 
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       for (int i = 0; i < max_replication_slots; i++)
+       for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s;
                bool            invalidated;
@@ -1649,17 +1662,24 @@ CheckLogicalSlotExists(void)
  * slots.
  */
 void
-CheckSlotRequirements(void)
+CheckSlotRequirements(bool repack)
 {
        /*
         * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
         * needs the same check.
         */
 
-       if (max_replication_slots == 0)
+       if (!repack && max_replication_slots == 0)
                ereport(ERROR,
-                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
+                               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                               errmsg("replication slots can only be used if \"%s\" > 0",
+                                          "max_replication_slots"));
+
+       if (repack && max_repack_replication_slots == 0)
+               ereport(ERROR,
+                               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                               errmsg("REPACK can only be used if \"%s\" > 0",
+                                          "max_repack_replication_slots"));
 
        if (wal_level < WAL_LEVEL_REPLICA)
                ereport(ERROR,
@@ -2210,7 +2230,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes,
        Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
        Assert(possible_causes != RS_INVAL_NONE);
 
-       if (max_replication_slots == 0)
+       if (max_replication_slots == 0 && max_repack_replication_slots == 0)
                return invalidated;
 
        XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
@@ -2218,7 +2238,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes,
 restart:
        found_valid_logicalslot = false;
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       for (int i = 0; i < max_replication_slots; i++)
+       for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
                bool            released_lock = false;
@@ -2323,7 +2343,7 @@ CheckPointReplicationSlots(bool is_shutdown)
         */
        LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
 
-       for (i = 0; i < max_replication_slots; i++)
+       for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
                char            path[MAXPGPATH];
@@ -2424,7 +2444,7 @@ StartupReplicationSlots(void)
        FreeDir(replication_dir);
 
        /* currently no slots exist, we're done. */
-       if (max_replication_slots <= 0)
+       if (max_replication_slots + max_repack_replication_slots <= 0)
                return;
 
        /* Now that we have recovered all the data, compute replication xmin */
@@ -2853,7 +2873,13 @@ RestoreSlotFromDisk(const char *name)
                                                NameStr(cp.slotdata.name)),
                                 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
 
-       /* nothing can be active yet, don't lock anything */
+       /*
+        * Nothing can be active yet, don't lock anything.  Note we iterate up to
+        * max_replication_slots instead of adding max_repack_replication_slots as
+        * in all other places, because we must enforce the GUC value in case
+        * there were more slots before the shutdown than what it is set up to
+        * now.
+        */
        for (i = 0; i < max_replication_slots; i++)
        {
                ReplicationSlot *slot;
index 9f5e4f998fe9d8bfb92006eb0e52ee604ab81814..16fbd3837359392fadb0e12243ec01b7b51c1ad7 100644 (file)
@@ -53,7 +53,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
        /* acquire replication slot, this will check for conflicting names */
        ReplicationSlotCreate(name, false,
                                                  temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
-                                                 false, false);
+                                                 false, false, false);
 
        if (immediately_reserve)
        {
@@ -90,7 +90,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 
        CheckSlotPermissions();
 
-       CheckSlotRequirements();
+       CheckSlotRequirements(false);
 
        create_physical_replication_slot(NameStr(*name),
                                                                         immediately_reserve,
@@ -146,7 +146,7 @@ create_logical_replication_slot(char *name, char *plugin,
         */
        ReplicationSlotCreate(name, true,
                                                  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
-                                                 failover, false);
+                                                 false, failover, false);
 
        /*
         * Ensure the logical decoding is enabled before initializing the logical
@@ -164,6 +164,7 @@ create_logical_replication_slot(char *name, char *plugin,
         */
        ctx = CreateInitDecodingContext(plugin, NIL,
                                                                        false,  /* just catalogs is OK */
+                                                                       false,  /* not repack */
                                                                        restart_lsn,
                                                                        XL_ROUTINE(.page_read = read_local_xlog_page,
                                                                                           .segment_open = wal_segment_open,
@@ -203,7 +204,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 
        CheckSlotPermissions();
 
-       CheckLogicalDecodingRequirements();
+       CheckLogicalDecodingRequirements(false);
 
        create_logical_replication_slot(NameStr(*name),
                                                                        NameStr(*plugin),
@@ -240,7 +241,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 
        CheckSlotPermissions();
 
-       CheckSlotRequirements();
+       CheckSlotRequirements(false);
 
        ReplicationSlotDrop(NameStr(*name), true);
 
@@ -270,7 +271,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
        currlsn = GetXLogWriteRecPtr();
 
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-       for (slotno = 0; slotno < max_replication_slots; slotno++)
+       for (slotno = 0; slotno < max_replication_slots + max_repack_replication_slots; slotno++)
        {
                ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
                ReplicationSlot slot_contents;
@@ -648,9 +649,9 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
        CheckSlotPermissions();
 
        if (logical_slot)
-               CheckLogicalDecodingRequirements();
+               CheckLogicalDecodingRequirements(false);
        else
-               CheckSlotRequirements();
+               CheckSlotRequirements(false);
 
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
@@ -665,7 +666,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
         * managed to create the new slot, we advance the new slot's restart_lsn
         * to the source slot's updated restart_lsn the second time we lock it.
         */
-       for (int i = 0; i < max_replication_slots; i++)
+       for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
index b4a2117a7f983664de8bcd6f7ce1484285d71ab2..bad45adb004fb9575e09ce10be7b621ced78889f 100644 (file)
@@ -1241,7 +1241,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
        {
                ReplicationSlotCreate(cmd->slotname, false,
                                                          cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-                                                         false, false, false);
+                                                         false, false, false, false);
 
                if (reserve_wal)
                {
@@ -1261,7 +1261,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
                Assert(cmd->kind == REPLICATION_KIND_LOGICAL);
 
-               CheckLogicalDecodingRequirements();
+               CheckLogicalDecodingRequirements(false);
 
                /*
                 * Initially create persistent slot as ephemeral - that allows us to
@@ -1272,7 +1272,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
                 */
                ReplicationSlotCreate(cmd->slotname, true,
                                                          cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-                                                         two_phase, failover, false);
+                                                         two_phase, false, failover, false);
 
                /*
                 * Do options check early so that we can bail before calling the
@@ -1330,6 +1330,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
                Assert(IsLogicalDecodingEnabled());
 
                ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
+                                                                               false,
                                                                                InvalidXLogRecPtr,
                                                                                XL_ROUTINE(.page_read = logical_read_xlog_page,
                                                                                                   .segment_open = WalSndSegmentOpen,
@@ -1487,7 +1488,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
        QueryCompletion qc;
 
        /* make sure that our requirements are still fulfilled */
-       CheckLogicalDecodingRequirements();
+       CheckLogicalDecodingRequirements(false);
 
        Assert(!MyReplicationSlot);
 
index fcb6ab805830961205d50ed2a4f62904a5cd9003..632f3ba4989c9ff5a58b95f27c90bd373ffc5fbd 100644 (file)
   max => 'MAX_BACKENDS',
 },
 
+{ name => 'max_repack_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
+  short_desc => 'Sets the maximum number of replication slots for use by REPACK.',
+  variable => 'max_repack_replication_slots',
+  boot_val => '5',
+  min => '0',
+  max => 'MAX_BACKENDS',
+},
+
 /* see max_wal_senders */
 { name => 'max_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
   short_desc => 'Sets the maximum number of simultaneously defined replication slots.',
index e3e462f3efb906932c97e90e5811e0022e5ff15b..2e10eb4a36a91cd9e86b931363eb05a7958dba98 100644 (file)
                                 # (change requires restart)
 #max_replication_slots = 10     # max number of replication slots
                                 # (change requires restart)
+#max_repack_replication_slots = 5 # max number of replication slots for REPACK
+                                # (change requires restart)
 #wal_keep_size = 0              # in megabytes; 0 disables
 #max_slot_wal_keep_size = -1    # in megabytes; -1 disables
 #idle_replication_slot_timeout = 0      # in seconds; 0 disables
index bc9d4ece67261445fbf6804a7fa6e971f49f5167..bc075b1674175f65d7d12a0ae833e71c782a1ba9 100644 (file)
@@ -115,11 +115,12 @@ typedef struct LogicalDecodingContext
 } LogicalDecodingContext;
 
 
-extern void CheckLogicalDecodingRequirements(void);
+extern void CheckLogicalDecodingRequirements(bool repack);
 
 extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
                                                                                                                 List *output_plugin_options,
                                                                                                                 bool need_full_snapshot,
+                                                                                                                bool for_repack,
                                                                                                                 XLogRecPtr restart_lsn,
                                                                                                                 XLogReaderRoutine *xl_routine,
                                                                                                                 LogicalOutputPluginWriterPrepareWrite prepare_write,
index 1a3557de6070ab847c37ae0997965e8021d46487..77c8d0975b6826e244e83cda68885bc7096f31d6 100644 (file)
@@ -324,13 +324,14 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT int max_repack_replication_slots;
 extern PGDLLIMPORT char *synchronized_standby_slots;
 extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
 
 /* management of individual slots */
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
                                                                  ReplicationSlotPersistency persistency,
-                                                                 bool two_phase, bool failover,
+                                                                 bool two_phase, bool repack, bool failover,
                                                                  bool synced);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
@@ -373,7 +374,7 @@ extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(bool is_shutdown);
 
-extern void CheckSlotRequirements(void);
+extern void CheckSlotRequirements(bool repack);
 extern void CheckSlotPermissions(void);
 extern ReplicationSlotInvalidationCause
                        GetSlotInvalidationCause(const char *cause_name);