start_repack_decoding_worker(Oid relid)
{
Size size;
- dsm_segment *seg;
DecodingWorkerShared *shared;
shm_mq *mq;
- shm_mq_handle *mqh;
BackgroundWorker bgw;
+ decoding_worker = palloc0_object(DecodingWorker);
+
/* Setup shared memory. */
size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) +
BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE);
- seg = dsm_create(size, 0);
- shared = (DecodingWorkerShared *) dsm_segment_address(seg);
+ decoding_worker->seg = dsm_create(size, 0);
+
+ shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
shared->initialized = false;
shared->lsn_upto = InvalidXLogRecPtr;
shared->done = false;
- SharedFileSetInit(&shared->sfs, seg);
+ SharedFileSetInit(&shared->sfs, decoding_worker->seg);
shared->last_exported = -1;
SpinLockInit(&shared->mutex);
shared->dbid = MyDatabaseId;
mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue),
REPACK_ERROR_QUEUE_SIZE);
shm_mq_set_receiver(mq, MyProc);
- mqh = shm_mq_attach(mq, seg, NULL);
+
+ decoding_worker->error_mqh = shm_mq_attach(mq, decoding_worker->seg, NULL);
memset(&bgw, 0, sizeof(bgw));
snprintf(bgw.bgw_name, BGW_MAXLEN,
bgw.bgw_restart_time = BGW_NEVER_RESTART;
snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain");
- bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+ bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(decoding_worker->seg));
bgw.bgw_notify_pid = MyProcPid;
- decoding_worker = palloc0_object(DecodingWorker);
if (!RegisterDynamicBackgroundWorker(&bgw, &decoding_worker->handle))
ereport(ERROR,
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
errhint("You might need to increase \"%s\".", "max_worker_processes"));
- decoding_worker->seg = seg;
- decoding_worker->error_mqh = mqh;
-
/*
* The decoding setup must be done before the caller can have XID assigned
* for any reason, otherwise the worker might end up in a deadlock,
static void
stop_repack_decoding_worker(void)
{
- BgwHandleStatus status;
-
- /* Haven't reached the worker startup? */
+ /* Nothing to do if no worker was set up. */
if (decoding_worker == NULL)
return;
- /* Could not register the worker? */
- if (decoding_worker->handle == NULL)
- return;
-
- TerminateBackgroundWorker(decoding_worker->handle);
- /* The worker should really exit before the REPACK command does. */
- HOLD_INTERRUPTS();
- status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
- RESUME_INTERRUPTS();
+ /* Terminate the worker process, if one is running. */
+ if (decoding_worker->handle != NULL)
+ {
+ BgwHandleStatus status;
- if (status == BGWH_POSTMASTER_DIED)
- ereport(FATAL,
- errcode(ERRCODE_ADMIN_SHUTDOWN),
- errmsg("postmaster exited during REPACK command"));
+ TerminateBackgroundWorker(decoding_worker->handle);
+ /* The worker should really exit before the REPACK command does. */
+ HOLD_INTERRUPTS();
+ status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
+ RESUME_INTERRUPTS();
- shm_mq_detach(decoding_worker->error_mqh);
+ if (status == BGWH_POSTMASTER_DIED)
+ ereport(FATAL,
+ errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("postmaster exited during REPACK command"));
+ }
/*
- * If we could not cancel the current sleep due to ERROR, do that before
- * we detach from the shared memory the condition variable is located in.
- * If we did not, the bgworker ERROR handling code would try and fail
- * badly.
+ * Now detach from our shared memory segment. In error cases there might
+ * still be messages from the worker in the queue, which ProcessInterrupts
+ * would try to read; this is pointless (and causes an assertion failure),
+ * so set the global pointer to NULL to have ProcessRepackMessages ignore
+ * them.
+ *
+ * We must also cancel the current sleep, if one is still set up. This is
+ * critical because the CV lives in the DSM that we're about to detach, so
+ * if we omit it, later automatic cleanup tries to clear freed memory.
*/
+ if (decoding_worker->error_mqh != NULL)
+ shm_mq_detach(decoding_worker->error_mqh);
ConditionVariableCancelSleep();
-
- dsm_detach(decoding_worker->seg);
+ if (decoding_worker->seg != NULL)
+ dsm_detach(decoding_worker->seg);
pfree(decoding_worker);
decoding_worker = NULL;
}