]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Distinguish datacheckums worker invocations more reliably
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 24 Jun 2026 12:07:33 +0000 (15:07 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 24 Jun 2026 12:07:33 +0000 (15:07 +0300)
In some corner cases, a new datachecksums worker could be launched
while an old one was still running.  If you're really unlucky, the old
worker could set the worker_result in shared memory and mislead the
launcher to think that a newer worker invocation completed
successfully, even though it failed for some reason.  That's highly
unlikely to happen in practice as it requires several race conditions
with workers and launchers starting, failing and succeeding and at the
right moments.  Nevertheless, better to tighten it up.

To distinguish different worker invocations, assign a unique
'worker_invocation' number every time a new worker is launched.  In
the worker, check that the invocation number matches before setting
the worker result.  This ensures that the result always belongs to the
latest invocation.

Reviewed-by: Daniel Gustafsson <daniel@yesql.se>
Discussion: https://www.postgresql.org/message-id/b283fbb9-298e-4953-9120-eefaf24fae20@iki.fi

src/backend/postmaster/datachecksum_state.c

index 1ad8fea93f04cc16206285b2f2c95ef925ae1ada..68557c16cb94f3e05fcf258277f0cd0598811920 100644 (file)
@@ -315,11 +315,18 @@ typedef struct DataChecksumsStateStruct
        bool            launcher_running;
 
        /*
-        * PID of the worker process, if it's currently running, of InvalidPid if
-        * none. This is set by the worker launcher when it starts waiting for a
-        * worker process to finish.
+        * Every time a new worker is launched, it's assigned a unique invocation
+        * number by incrementing this counter.
         */
-       pid_t           worker_pid;
+       uint64          worker_invocation_counter;
+
+       /*
+        * Information about the current worker, if it's currently running.  These
+        * are set by the worker launcher.
+        */
+       uint64          worker_invocation;      /* unique invocation number */
+       Oid                     database_oid;   /* database it's processing */
+       pid_t           worker_pid;             /* worker process's PID */
 
        /*
         * These fields indicate the target state that the worker is currently
@@ -361,6 +368,8 @@ typedef struct DataChecksumsWorkerDatabase
 /* Flag set by the interrupt handler */
 static volatile sig_atomic_t abort_requested = false;
 
+static uint64 worker_invocation;
+
 /*
  * Have we set the DataChecksumsStateStruct->launcher_running flag?
  * If we have, we need to clear it before exiting!
@@ -389,10 +398,21 @@ const ShmemCallbacks DataChecksumsShmemCallbacks = {
        .request_fn = DataChecksumsShmemRequest,
 };
 
-#define CHECK_FOR_ABORT_REQUEST() \
+#define CHECK_FOR_LAUNCHER_ABORT_REQUEST() \
+       do {                                                                                                                    \
+               Assert(MyBackendType == B_DATACHECKSUMSWORKER_LAUNCHER);        \
+               LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);                      \
+               if (DataChecksumState->launch_operation != operation)           \
+                       abort_requested = true;                                                                 \
+               LWLockRelease(DataChecksumsWorkerLock);                                         \
+       } while (0)
+
+#define CHECK_FOR_WORKER_ABORT_REQUEST() \
        do {                                                                                                                    \
+               Assert(MyBackendType == B_DATACHECKSUMSWORKER_WORKER);          \
                LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);                      \
-               if (DataChecksumState->launch_operation != operation)           \
+               if (DataChecksumState->worker_invocation != worker_invocation || \
+                       DataChecksumState->launch_operation != operation)               \
                        abort_requested = true;                                                                 \
                LWLockRelease(DataChecksumsWorkerLock);                                         \
        } while (0)
@@ -726,11 +746,7 @@ ProcessSingleRelationFork(Relation reln, ForkNumber forkNum, BufferAccessStrateg
 
                /* Check if we are asked to abort, the abortion will bubble up. */
                Assert(operation == ENABLE_DATACHECKSUMS);
-               LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
-               if (DataChecksumState->launch_operation == DISABLE_DATACHECKSUMS)
-                       abort_requested = true;
-               LWLockRelease(DataChecksumsWorkerLock);
-
+               CHECK_FOR_WORKER_ABORT_REQUEST();
                if (abort_requested)
                        return false;
 
@@ -813,16 +829,23 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
        BackgroundWorkerHandle *bgw_handle;
        BgwHandleStatus status;
        pid_t           pid;
+       uint64          invocation;
        char            activity[NAMEDATALEN + 64];
        DataChecksumsWorkerResult result;
 
+       LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+
        /*
         * Initialize result to FAILED.  The worker will change it to SUCCESSFUL
         * if it completes successfully.
         */
-       LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
        DataChecksumState->worker_result = DATACHECKSUMSWORKER_FAILED;
        DataChecksumState->worker_pid = InvalidPid;
+
+       invocation = ++DataChecksumState->worker_invocation_counter;
+       DataChecksumState->worker_invocation = invocation;
+       DataChecksumState->database_oid = db->dboid;
+
        LWLockRelease(DataChecksumsWorkerLock);
 
        memset(&bgw, 0, sizeof(bgw));
@@ -834,7 +857,8 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
        snprintf(bgw.bgw_type, BGW_MAXLEN, "datachecksums worker");
        bgw.bgw_restart_time = BGW_NEVER_RESTART;
        bgw.bgw_notify_pid = MyProcPid;
-       bgw.bgw_main_arg = ObjectIdGetDatum(db->dboid);
+       /* pass the invocation number to the worker process */
+       bgw.bgw_main_arg = UInt64GetDatum(invocation);
 
        /*
         * If there are no worker slots available, there is little we can do.  If
@@ -858,6 +882,7 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
                 * for it we can see a STOPPED status here without it being a failure.
                 */
                LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
+               Assert(DataChecksumState->worker_invocation == invocation);
                if (DataChecksumState->worker_result == DATACHECKSUMSWORKER_SUCCESSFUL)
                {
                        LWLockRelease(DataChecksumsWorkerLock);
@@ -901,6 +926,7 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
 
        /* Save the pid of the worker so we can signal it later */
        LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+       Assert(DataChecksumState->worker_invocation == invocation);
        DataChecksumState->worker_pid = pid;
        LWLockRelease(DataChecksumsWorkerLock);
 
@@ -917,6 +943,7 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
                                errhint("Restart the database and restart data checksum processing by calling pg_enable_data_checksums()."));
 
        LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+       Assert(DataChecksumState->worker_invocation == invocation);
        result = DataChecksumState->worker_result;
        DataChecksumState->worker_pid = InvalidPid;
        LWLockRelease(DataChecksumsWorkerLock);
@@ -1044,7 +1071,7 @@ WaitForAllTransactionsToFinish(void)
                                        errhint("Data checksums processing must be restarted manually after cluster restart."));
 
                CHECK_FOR_INTERRUPTS();
-               CHECK_FOR_ABORT_REQUEST();
+               CHECK_FOR_LAUNCHER_ABORT_REQUEST();
 
                if (abort_requested)
                        break;
@@ -1145,13 +1172,9 @@ again:
                         * If the target state changed during processing then it's not a
                         * failure, so restart processing instead.
                         */
-                       LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
-                       if (DataChecksumState->launch_operation != operation)
-                       {
-                               LWLockRelease(DataChecksumsWorkerLock);
+                       CHECK_FOR_LAUNCHER_ABORT_REQUEST();
+                       if (abort_requested)
                                goto done;
-                       }
-                       LWLockRelease(DataChecksumsWorkerLock);
                        ereport(ERROR,
                                        errcode(ERRCODE_INSUFFICIENT_RESOURCES),
                                        errmsg("unable to enable data checksums in cluster"));
@@ -1520,7 +1543,7 @@ BuildRelationList(bool temp_relations, bool include_shared)
 void
 DataChecksumsWorkerMain(Datum arg)
 {
-       Oid                     dboid = DatumGetObjectId(arg);
+       Oid                     dboid;
        List       *RelationList = NIL;
        List       *InitialTempTableList = NIL;
        BufferAccessStrategy strategy;
@@ -1531,6 +1554,8 @@ DataChecksumsWorkerMain(Datum arg)
        bool            retried = false;
 #endif
 
+       worker_invocation = DatumGetUInt64(arg);
+
        operation = ENABLE_DATACHECKSUMS;
 
        pqsignal(SIGTERM, die);
@@ -1541,6 +1566,15 @@ DataChecksumsWorkerMain(Datum arg)
        MyBackendType = B_DATACHECKSUMSWORKER_WORKER;
        init_ps_display(NULL);
 
+       LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
+       if (DataChecksumState->worker_invocation != worker_invocation)
+       {
+               LWLockRelease(DataChecksumsWorkerLock);
+               return;
+       }
+       dboid = DataChecksumState->database_oid;
+       LWLockRelease(DataChecksumsWorkerLock);
+
        BackgroundWorkerInitializeConnectionByOid(dboid, InvalidOid,
                                                                                          BGWORKER_BYPASS_ALLOWCONN);
 
@@ -1556,6 +1590,11 @@ DataChecksumsWorkerMain(Datum arg)
         */
        InitialTempTableList = BuildRelationList(true, false);
        LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+       if (DataChecksumState->worker_invocation != worker_invocation)
+       {
+               LWLockRelease(DataChecksumsWorkerLock);
+               return;
+       }
        process_shared = DataChecksumState->process_shared_catalogs;
 
        /*
@@ -1611,7 +1650,7 @@ DataChecksumsWorkerMain(Datum arg)
                pgstat_progress_update_param(PROGRESS_DATACHECKSUMS_RELS_DONE,
                                                                         ++rels_done);
                CHECK_FOR_INTERRUPTS();
-               CHECK_FOR_ABORT_REQUEST();
+               CHECK_FOR_WORKER_ABORT_REQUEST();
 
                if (abort_requested)
                        break;
@@ -1622,6 +1661,11 @@ DataChecksumsWorkerMain(Datum arg)
                 * to be refreshed.
                 */
                LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+               if (DataChecksumState->worker_invocation != worker_invocation)
+               {
+                       LWLockRelease(DataChecksumsWorkerLock);
+                       break;
+               }
                if ((DataChecksumState->launch_cost_delay != DataChecksumState->cost_delay)
                        || (DataChecksumState->launch_cost_limit != DataChecksumState->cost_limit))
                {
@@ -1650,7 +1694,8 @@ DataChecksumsWorkerMain(Datum arg)
        if (aborted || abort_requested)
        {
                LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
-               DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
+               if (DataChecksumState->worker_invocation == worker_invocation)
+                       DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
                LWLockRelease(DataChecksumsWorkerLock);
                ereport(DEBUG1,
                                errmsg("data checksum processing aborted in database OID %u",
@@ -1717,12 +1762,13 @@ DataChecksumsWorkerMain(Datum arg)
                                                 WAIT_EVENT_CHECKSUM_ENABLE_TEMPTABLE_WAIT);
 
                CHECK_FOR_INTERRUPTS();
-               CHECK_FOR_ABORT_REQUEST();
+               CHECK_FOR_WORKER_ABORT_REQUEST();
 
                if (aborted || abort_requested)
                {
                        LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
-                       DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
+                       if (DataChecksumState->worker_invocation == worker_invocation)
+                               DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
                        LWLockRelease(DataChecksumsWorkerLock);
                        ereport(LOG,
                                        errmsg("data checksum processing aborted in database OID %u",
@@ -1737,6 +1783,7 @@ DataChecksumsWorkerMain(Datum arg)
        pgstat_progress_end_command();
 
        LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
-       DataChecksumState->worker_result = DATACHECKSUMSWORKER_SUCCESSFUL;
+       if (DataChecksumState->worker_invocation == worker_invocation)
+               DataChecksumState->worker_result = DATACHECKSUMSWORKER_SUCCESSFUL;
        LWLockRelease(DataChecksumsWorkerLock);
 }