#include "tcop/tcopprot.h"
#include "utils/memutils.h"
-#define REPL_PLUGIN_NAME "pgrepack"
+#define PGREPACK_PLUGIN "pgrepack"
static void RepackWorkerShutdown(int code, Datum arg);
static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
Relation rel;
Oid toastrelid;
LogicalDecodingContext *ctx;
- NameData slotname;
+ char slotname[NAMEDATALEN];
RepackDecodingState *dstate;
MemoryContext oldcxt;
*/
Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
- /*
- * Make sure we can use logical decoding.
- */
+ /* Make sure we can use logical decoding */
CheckLogicalDecodingRequirements(true);
/*
- * A single backend should not execute multiple REPACK commands at a time,
- * so use PID to make the slot unique.
+ * Create the replication slot we'll use, and enable logical decoding in
+ * case it isn't already on.
*
- * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
+ * Make the slot RS_TEMPORARY so that it's removed on ERROR. A backend
+ * cannot execute multiple REPACK commands at a time, so the PID is enough
+ * to make the slot name unique.
*/
- snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
- ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
+ snprintf(slotname, NAMEDATALEN, "pg_repack_%d", MyProcPid);
+ ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true,
false, false);
-
EnsureLogicalDecodingEnabled();
/*
- * Neither prepare_write nor do_write callback nor update_progress is
- * useful for us.
- */
- ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
- NIL,
- true,
- true,
- InvalidXLogRecPtr,
- XL_ROUTINE(.page_read = read_local_xlog_page,
- .segment_open = wal_segment_open,
- .segment_close = wal_segment_close),
- NULL, NULL, NULL);
-
- /*
- * We don't have control on setting fast_forward, so at least check it.
+ * Set up repacked_rel_locator and repacked_rel_toast_locator, which we
+ * use to skip decoding of unrelated relations.
*/
- Assert(!ctx->fast_forward);
-
- /* Avoid logical decoding of other relations. */
rel = table_open(relid, AccessShareLock);
repacked_rel_locator = rel->rd_locator;
toastrelid = rel->rd_rel->reltoastrelid;
}
table_close(rel, AccessShareLock);
- DecodingContextFindStartpoint(ctx);
-
/*
- * decode_concurrent_changes() needs non-blocking callback.
+ * Set up our logical decoding context. We initially use the blocking
+ * read_local_xlog_page until we find the start point, and switch to the
+ * non-blocking interface afterwards.
*/
- ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
+ ctx = CreateInitDecodingContext(PGREPACK_PLUGIN,
+ NIL,
+ true,
+ true,
+ InvalidXLogRecPtr,
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
+ NULL, NULL, NULL);
- /* Some WAL records should have been read. */
- Assert(XLogRecPtrIsValid(ctx->reader->EndRecPtr));
+ /* Complete setup of output_writer_private */
+ dstate = (RepackDecodingState *) ctx->output_writer_private;
+ dstate->relid = relid;
+ dstate->worker_cxt = CurrentMemoryContext;
+ dstate->worker_resowner = CurrentResourceOwner;
+
+ /* We don't have control on fast_forward, but verify it's sane */
+ Assert(!ctx->fast_forward);
+
+ /* Find our decoding starting point. */
+ DecodingContextFindStartpoint(ctx);
+
+ /* From this point on, we need non-blocking WAL reads */
+ ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
/*
* Initialize repack_current_segment so that we can notice WAL segment
XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
wal_segment_size);
- /* Our private state belongs to the decoding context. */
- oldcxt = MemoryContextSwitchTo(ctx->context);
-
/*
- * read_local_xlog_page_no_wait() needs to be able to indicate the end of
- * WAL.
+ * Set up our reader private state to let the page-read callback notify
+ * when end-of-WAL has been reached. This lives in the same context as
+ * the logical decoding itself.
*/
+ oldcxt = MemoryContextSwitchTo(ctx->context);
ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
- dstate = palloc0_object(RepackDecodingState);
MemoryContextSwitchTo(oldcxt);
-#ifdef USE_ASSERT_CHECKING
- dstate->relid = relid;
-#endif
-
- dstate->change_cxt = AllocSetContextCreate(ctx->context,
- "REPACK - change",
- ALLOCSET_DEFAULT_SIZES);
-
- /* The file will be set as soon as we have it opened. */
- dstate->file = NULL;
-
- /*
- * Memory context and resource owner for long-lived resources.
- */
- dstate->worker_cxt = CurrentMemoryContext;
- dstate->worker_resowner = CurrentResourceOwner;
-
- ctx->output_writer_private = dstate;
-
return ctx;
}
#include "postgres.h"
#include "access/detoast.h"
+#include "commands/repack.h"
#include "commands/repack_internal.h"
#include "replication/snapbuild.h"
#include "utils/memutils.h"
repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init)
{
- ctx->output_plugin_private = NULL;
+ RepackDecodingState *dstate;
+
+ if (!AmRepackWorker())
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("unsupported use of logical decoding plugin \"%s\"",
+ "pgrepack"),
+ errdetail("This plugin can only be used by %s.",
+ "REPACK (CONCURRENTLY)"));
+
+ /* Initial setup of our private state */
+ Assert(CurrentMemoryContext == ctx->context);
+ dstate = palloc0_object(RepackDecodingState);
+ dstate->change_cxt = AllocSetContextCreate(ctx->context,
+ "REPACK - change",
+ ALLOCSET_DEFAULT_SIZES);
+ /* repack_setup_logical_decoding fills in the rest */
+ ctx->output_writer_private = dstate;
/* Probably unnecessary, as we don't use the SQL interface ... */
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;