</listitem>
</varlistentry>
+ <varlistentry id="guc-max-repack-replication-slots" xreflabel="max_repack_replication_slots">
+ <term><varname>max_repack_replication_slots</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>max_repack_replication_slots</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Specifies the maximum number of replication slots for use of
+ the <command>REPACK</command> command. The default is 5.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-max-replication-slots" xreflabel="max_replication_slots">
<term><varname>max_replication_slots</varname> (<type>integer</type>)
<indexterm>
<listitem>
<para>
- The <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
- configuration parameter does not allow for creation of an additional
- replication slot.
+ The <link linkend="guc-max-repack-replication-slots"><varname>max_repack_replication_slots</varname></link>
+ configuration parameter does not allow for the creation of an
+ additional replication slot.
</para>
</listitem>
</itemizedlist>
* Make sure we can use logical decoding.
*/
CheckSlotPermissions();
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(true);
/*
* A single backend should not execute multiple REPACK commands at a time,
* RS_TEMPORARY so that the slot gets cleaned up on ERROR.
*/
snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
- ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false,
- false);
+ ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
+ false, false);
EnsureLogicalDecodingEnabled();
ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
NIL,
true,
+ true,
InvalidXLogRecPtr,
XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
errmsg("creating replication conflict detection slot"));
ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
- false, false);
+ false, false, false);
init_conflict_slot_xmin();
}
* decoding.
*/
void
-CheckLogicalDecodingRequirements(void)
+CheckLogicalDecodingRequirements(bool repack)
{
- CheckSlotRequirements();
+ CheckSlotRequirements(repack);
/*
* NB: Adding a new requirement likely means that RestoreSlotFromDisk()
* output_plugin_options -- contains options passed to the output plugin
* need_full_snapshot -- if true, must obtain a snapshot able to read all
* tables; if false, one that can read only catalogs is acceptable.
+ * for_repack -- if true, we're going to be decoding for REPACK.
* restart_lsn -- if given as invalid, it's this routine's responsibility to
* mark WAL as reserved by setting a convenient restart_lsn for the slot.
* Otherwise, we set for decoding to start from the given LSN without
CreateInitDecodingContext(const char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
+ bool for_repack,
XLogRecPtr restart_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
* On a standby, this check is also required while creating the slot.
* Check the comments in the function.
*/
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(for_repack);
/* shorter lines... */
slot = MyReplicationSlot;
CheckSlotPermissions();
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(false);
if (PG_ARGISNULL(0))
ereport(ERROR,
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
*/
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
remote_slot->two_phase,
+ false,
remote_slot->failover,
true);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
/* GUC variables */
int max_replication_slots = 10; /* the maximum number of replication
* slots */
+int max_repack_replication_slots = 5; /* the maximum number of slots
+ * for REPACK */
/*
* Invalidate replication slots that have remained idle longer than this
{
Size size;
- if (max_replication_slots == 0)
+ if (max_replication_slots + max_repack_replication_slots == 0)
return;
size = offsetof(ReplicationSlotCtlData, replication_slots);
size = add_size(size,
- mul_size(max_replication_slots, sizeof(ReplicationSlot)));
+ mul_size(max_replication_slots + max_repack_replication_slots,
+ sizeof(ReplicationSlot)));
ShmemRequestStruct(.name = "ReplicationSlot Ctl",
.size = size,
.ptr = (void **) &ReplicationSlotCtl,
static void
ReplicationSlotsShmemInit(void *arg)
{
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
* db_specific: logical decoding is db specific; if the slot is going to
* be used for that pass true, otherwise false.
* two_phase: If enabled, allows decoding of prepared transactions.
+ * repack: If true, use a slot from the pool for REPACK.
* failover: If enabled, allows the slot to be synced to standbys so
* that logical replication can be resumed after failover.
* synced: True if the slot is synchronized from the primary server.
void
ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
- bool two_phase, bool failover, bool synced)
+ bool two_phase, bool repack, bool failover, bool synced)
{
ReplicationSlot *slot = NULL;
- int i;
+ int startpoint,
+ endpoint;
Assert(MyReplicationSlot == NULL);
LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
/*
- * Check for name collision, and identify an allocatable slot. We need to
- * hold ReplicationSlotControlLock in shared mode for this, so that nobody
- * else can change the in_use flags while we're looking at them.
+ * Check for name collision (across the whole array), and identify an
+ * allocatable slot (in the array slice specific to our current use case:
+ * either general, or REPACK only). We need to hold
+ * ReplicationSlotControlLock in shared mode for this, so that nobody else
+ * can change the in_use flags while we're looking at them.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ startpoint = !repack ? 0 : max_replication_slots;
+ endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("replication slot \"%s\" already exists", name)));
- if (!s->in_use && slot == NULL)
+
+ if (i >= startpoint && i < endpoint &&
+ !s->in_use && slot == NULL)
slot = s;
}
LWLockRelease(ReplicationSlotControlLock);
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("all replication slots are in use"),
- errhint("Free one or increase \"max_replication_slots\".")));
+ errhint("Free one or increase \"%s\".",
+ repack ? "max_repack_replication_slots" : "max_replication_slots")));
/*
* Since this slot is not in use, nobody should be looking at any part of
if (need_lock)
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
ReplicationSlotIndex(ReplicationSlot *slot)
{
Assert(slot >= ReplicationSlotCtl->replication_slots &&
- slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
+ slot < ReplicationSlotCtl->replication_slots +
+ (max_replication_slots + max_repack_replication_slots));
return slot - ReplicationSlotCtl->replication_slots;
}
restart:
found_valid_logicalslot = false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
if (!already_locked)
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
TransactionId effective_xmin;
Assert(ReplicationSlotCtl != NULL);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn;
XLogRecPtr result = InvalidXLogRecPtr;
int i;
- if (max_replication_slots <= 0)
+ if (max_replication_slots + max_repack_replication_slots <= 0)
return InvalidXLogRecPtr;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s;
XLogRecPtr restart_lsn;
*nslots = *nactive = 0;
- if (max_replication_slots <= 0)
+ if (max_replication_slots + max_repack_replication_slots <= 0)
return false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s;
bool found_valid_logicalslot;
bool dropped = false;
- if (max_replication_slots <= 0)
+ if (max_replication_slots + max_repack_replication_slots <= 0)
return;
restart:
found_valid_logicalslot = false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s;
char *slotname;
{
bool found = false;
- if (max_replication_slots <= 0)
+ if (max_replication_slots + max_repack_replication_slots <= 0)
return false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s;
bool invalidated;
* slots.
*/
void
-CheckSlotRequirements(void)
+CheckSlotRequirements(bool repack)
{
/*
* NB: Adding a new requirement likely means that RestoreSlotFromDisk()
* needs the same check.
*/
- if (max_replication_slots == 0)
+ if (!repack && max_replication_slots == 0)
ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slots can only be used if \"%s\" > 0",
+ "max_replication_slots"));
+
+ if (repack && max_repack_replication_slots == 0)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("REPACK can only be used if \"%s\" > 0",
+ "max_repack_replication_slots"));
if (wal_level < WAL_LEVEL_REPLICA)
ereport(ERROR,
Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
Assert(possible_causes != RS_INVAL_NONE);
- if (max_replication_slots == 0)
+ if (max_replication_slots == 0 && max_repack_replication_slots == 0)
return invalidated;
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
restart:
found_valid_logicalslot = false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
bool released_lock = false;
*/
LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
- for (i = 0; i < max_replication_slots; i++)
+ for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
char path[MAXPGPATH];
FreeDir(replication_dir);
/* currently no slots exist, we're done. */
- if (max_replication_slots <= 0)
+ if (max_replication_slots + max_repack_replication_slots <= 0)
return;
/* Now that we have recovered all the data, compute replication xmin */
NameStr(cp.slotdata.name)),
errhint("Change \"wal_level\" to be \"replica\" or higher.")));
- /* nothing can be active yet, don't lock anything */
+ /*
+ * Nothing can be active yet, don't lock anything. Note we iterate up to
+ * max_replication_slots instead of adding max_repack_replication_slots as
+ * in all other places, because we must enforce the GUC value in case
+ * there were more slots before the shutdown than what it is set up to
+ * now.
+ */
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *slot;
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false,
temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
- false, false);
+ false, false, false);
if (immediately_reserve)
{
CheckSlotPermissions();
- CheckSlotRequirements();
+ CheckSlotRequirements(false);
create_physical_replication_slot(NameStr(*name),
immediately_reserve,
*/
ReplicationSlotCreate(name, true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
- failover, false);
+ false, failover, false);
/*
* Ensure the logical decoding is enabled before initializing the logical
*/
ctx = CreateInitDecodingContext(plugin, NIL,
false, /* just catalogs is OK */
+ false, /* not repack */
restart_lsn,
XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
CheckSlotPermissions();
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(false);
create_logical_replication_slot(NameStr(*name),
NameStr(*plugin),
CheckSlotPermissions();
- CheckSlotRequirements();
+ CheckSlotRequirements(false);
ReplicationSlotDrop(NameStr(*name), true);
currlsn = GetXLogWriteRecPtr();
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
- for (slotno = 0; slotno < max_replication_slots; slotno++)
+ for (slotno = 0; slotno < max_replication_slots + max_repack_replication_slots; slotno++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
ReplicationSlot slot_contents;
CheckSlotPermissions();
if (logical_slot)
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(false);
else
- CheckSlotRequirements();
+ CheckSlotRequirements(false);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
* managed to create the new slot, we advance the new slot's restart_lsn
* to the source slot's updated restart_lsn the second time we lock it.
*/
- for (int i = 0; i < max_replication_slots; i++)
+ for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
{
ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
- false, false, false);
+ false, false, false, false);
if (reserve_wal)
{
Assert(cmd->kind == REPLICATION_KIND_LOGICAL);
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(false);
/*
* Initially create persistent slot as ephemeral - that allows us to
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- two_phase, failover, false);
+ two_phase, false, failover, false);
/*
* Do options check early so that we can bail before calling the
Assert(IsLogicalDecodingEnabled());
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
+ false,
InvalidXLogRecPtr,
XL_ROUTINE(.page_read = logical_read_xlog_page,
.segment_open = WalSndSegmentOpen,
QueryCompletion qc;
/* make sure that our requirements are still fulfilled */
- CheckLogicalDecodingRequirements();
+ CheckLogicalDecodingRequirements(false);
Assert(!MyReplicationSlot);
max => 'MAX_BACKENDS',
},
+{ name => 'max_repack_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
+ short_desc => 'Sets the maximum number of replication slots for use by REPACK.',
+ variable => 'max_repack_replication_slots',
+ boot_val => '5',
+ min => '0',
+ max => 'MAX_BACKENDS',
+},
+
/* see max_wal_senders */
{ name => 'max_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
short_desc => 'Sets the maximum number of simultaneously defined replication slots.',
# (change requires restart)
#max_replication_slots = 10 # max number of replication slots
# (change requires restart)
+#max_repack_replication_slots = 5 # max number of replication slots for REPACK
+ # (change requires restart)
#wal_keep_size = 0 # in megabytes; 0 disables
#max_slot_wal_keep_size = -1 # in megabytes; -1 disables
#idle_replication_slot_timeout = 0 # in seconds; 0 disables
} LogicalDecodingContext;
-extern void CheckLogicalDecodingRequirements(void);
+extern void CheckLogicalDecodingRequirements(bool repack);
extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
+ bool for_repack,
XLogRecPtr restart_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
/* GUCs */
extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT int max_repack_replication_slots;
extern PGDLLIMPORT char *synchronized_standby_slots;
extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
- bool two_phase, bool failover,
+ bool two_phase, bool repack, bool failover,
bool synced);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(bool is_shutdown);
-extern void CheckSlotRequirements(void);
+extern void CheckSlotRequirements(bool repack);
extern void CheckSlotPermissions(void);
extern ReplicationSlotInvalidationCause
GetSlotInvalidationCause(const char *cause_name);