bool launcher_running;
/*
- * PID of the worker process, if it's currently running, of InvalidPid if
- * none. This is set by the worker launcher when it starts waiting for a
- * worker process to finish.
+ * Every time a new worker is launched, it's assigned a unique invocation
+ * number by incrementing this counter.
*/
- pid_t worker_pid;
+ uint64 worker_invocation_counter;
+
+ /*
+ * Information about the current worker, if it's currently running. These
+ * are set by the worker launcher.
+ */
+ uint64 worker_invocation; /* unique invocation number */
+ Oid database_oid; /* database it's processing */
+ pid_t worker_pid; /* worker process's PID */
/*
* These fields indicate the target state that the worker is currently
/* Flag set by the interrupt handler */
static volatile sig_atomic_t abort_requested = false;
+static uint64 worker_invocation;
+
/*
* Have we set the DataChecksumsStateStruct->launcher_running flag?
* If we have, we need to clear it before exiting!
.request_fn = DataChecksumsShmemRequest,
};
-#define CHECK_FOR_ABORT_REQUEST() \
+#define CHECK_FOR_LAUNCHER_ABORT_REQUEST() \
+ do { \
+ Assert(MyBackendType == B_DATACHECKSUMSWORKER_LAUNCHER); \
+ LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED); \
+ if (DataChecksumState->launch_operation != operation) \
+ abort_requested = true; \
+ LWLockRelease(DataChecksumsWorkerLock); \
+ } while (0)
+
+#define CHECK_FOR_WORKER_ABORT_REQUEST() \
do { \
+ Assert(MyBackendType == B_DATACHECKSUMSWORKER_WORKER); \
LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED); \
- if (DataChecksumState->launch_operation != operation) \
+ if (DataChecksumState->worker_invocation != worker_invocation || \
+ DataChecksumState->launch_operation != operation) \
abort_requested = true; \
LWLockRelease(DataChecksumsWorkerLock); \
} while (0)
/* Check if we are asked to abort, the abortion will bubble up. */
Assert(operation == ENABLE_DATACHECKSUMS);
- LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
- if (DataChecksumState->launch_operation == DISABLE_DATACHECKSUMS)
- abort_requested = true;
- LWLockRelease(DataChecksumsWorkerLock);
-
+ CHECK_FOR_WORKER_ABORT_REQUEST();
if (abort_requested)
return false;
BackgroundWorkerHandle *bgw_handle;
BgwHandleStatus status;
pid_t pid;
+ uint64 invocation;
char activity[NAMEDATALEN + 64];
DataChecksumsWorkerResult result;
+ LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+
/*
* Initialize result to FAILED. The worker will change it to SUCCESSFUL
* if it completes successfully.
*/
- LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
DataChecksumState->worker_result = DATACHECKSUMSWORKER_FAILED;
DataChecksumState->worker_pid = InvalidPid;
+
+ invocation = ++DataChecksumState->worker_invocation_counter;
+ DataChecksumState->worker_invocation = invocation;
+ DataChecksumState->database_oid = db->dboid;
+
LWLockRelease(DataChecksumsWorkerLock);
memset(&bgw, 0, sizeof(bgw));
snprintf(bgw.bgw_type, BGW_MAXLEN, "datachecksums worker");
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
- bgw.bgw_main_arg = ObjectIdGetDatum(db->dboid);
+ /* pass the invocation number to the worker process */
+ bgw.bgw_main_arg = UInt64GetDatum(invocation);
/*
* If there are no worker slots available, there is little we can do. If
* for it we can see a STOPPED status here without it being a failure.
*/
LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
+ Assert(DataChecksumState->worker_invocation == invocation);
if (DataChecksumState->worker_result == DATACHECKSUMSWORKER_SUCCESSFUL)
{
LWLockRelease(DataChecksumsWorkerLock);
/* Save the pid of the worker so we can signal it later */
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+ Assert(DataChecksumState->worker_invocation == invocation);
DataChecksumState->worker_pid = pid;
LWLockRelease(DataChecksumsWorkerLock);
errhint("Restart the database and restart data checksum processing by calling pg_enable_data_checksums()."));
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+ Assert(DataChecksumState->worker_invocation == invocation);
result = DataChecksumState->worker_result;
DataChecksumState->worker_pid = InvalidPid;
LWLockRelease(DataChecksumsWorkerLock);
errhint("Data checksums processing must be restarted manually after cluster restart."));
CHECK_FOR_INTERRUPTS();
- CHECK_FOR_ABORT_REQUEST();
+ CHECK_FOR_LAUNCHER_ABORT_REQUEST();
if (abort_requested)
break;
* If the target state changed during processing then it's not a
* failure, so restart processing instead.
*/
- LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
- if (DataChecksumState->launch_operation != operation)
- {
- LWLockRelease(DataChecksumsWorkerLock);
+ CHECK_FOR_LAUNCHER_ABORT_REQUEST();
+ if (abort_requested)
goto done;
- }
- LWLockRelease(DataChecksumsWorkerLock);
ereport(ERROR,
errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("unable to enable data checksums in cluster"));
void
DataChecksumsWorkerMain(Datum arg)
{
- Oid dboid = DatumGetObjectId(arg);
+ Oid dboid;
List *RelationList = NIL;
List *InitialTempTableList = NIL;
BufferAccessStrategy strategy;
bool retried = false;
#endif
+ worker_invocation = DatumGetUInt64(arg);
+
operation = ENABLE_DATACHECKSUMS;
pqsignal(SIGTERM, die);
MyBackendType = B_DATACHECKSUMSWORKER_WORKER;
init_ps_display(NULL);
+ LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
+ if (DataChecksumState->worker_invocation != worker_invocation)
+ {
+ LWLockRelease(DataChecksumsWorkerLock);
+ return;
+ }
+ dboid = DataChecksumState->database_oid;
+ LWLockRelease(DataChecksumsWorkerLock);
+
BackgroundWorkerInitializeConnectionByOid(dboid, InvalidOid,
BGWORKER_BYPASS_ALLOWCONN);
*/
InitialTempTableList = BuildRelationList(true, false);
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+ if (DataChecksumState->worker_invocation != worker_invocation)
+ {
+ LWLockRelease(DataChecksumsWorkerLock);
+ return;
+ }
process_shared = DataChecksumState->process_shared_catalogs;
/*
pgstat_progress_update_param(PROGRESS_DATACHECKSUMS_RELS_DONE,
++rels_done);
CHECK_FOR_INTERRUPTS();
- CHECK_FOR_ABORT_REQUEST();
+ CHECK_FOR_WORKER_ABORT_REQUEST();
if (abort_requested)
break;
* to be refreshed.
*/
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+ if (DataChecksumState->worker_invocation != worker_invocation)
+ {
+ LWLockRelease(DataChecksumsWorkerLock);
+ break;
+ }
if ((DataChecksumState->launch_cost_delay != DataChecksumState->cost_delay)
|| (DataChecksumState->launch_cost_limit != DataChecksumState->cost_limit))
{
if (aborted || abort_requested)
{
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
- DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
+ if (DataChecksumState->worker_invocation == worker_invocation)
+ DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
LWLockRelease(DataChecksumsWorkerLock);
ereport(DEBUG1,
errmsg("data checksum processing aborted in database OID %u",
WAIT_EVENT_CHECKSUM_ENABLE_TEMPTABLE_WAIT);
CHECK_FOR_INTERRUPTS();
- CHECK_FOR_ABORT_REQUEST();
+ CHECK_FOR_WORKER_ABORT_REQUEST();
if (aborted || abort_requested)
{
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
- DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
+ if (DataChecksumState->worker_invocation == worker_invocation)
+ DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
LWLockRelease(DataChecksumsWorkerLock);
ereport(LOG,
errmsg("data checksum processing aborted in database OID %u",
pgstat_progress_end_command();
LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
- DataChecksumState->worker_result = DATACHECKSUMSWORKER_SUCCESSFUL;
+ if (DataChecksumState->worker_invocation == worker_invocation)
+ DataChecksumState->worker_result = DATACHECKSUMSWORKER_SUCCESSFUL;
LWLockRelease(DataChecksumsWorkerLock);
}