]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Prevent excessive delays before launching new logrep workers.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 24 Jun 2025 18:14:04 +0000 (14:14 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 24 Jun 2025 18:14:04 +0000 (14:14 -0400)
The logical replication launcher process would sometimes sleep
for as much as 3 minutes before noticing that it is supposed
to launch a new worker.  This could happen if
(1) WaitForReplicationWorkerAttach absorbed a process latch wakeup
that was meant to cause ApplyLauncherMain to do work, or
(2) logicalrep_worker_launch reported failure, either because of
resource limits or because the new worker terminated immediately.

In case (2), the expected behavior is that we retry the launch after
wal_retrieve_retry_interval, but that didn't reliably happen.

It's not clear how often such conditions would occur in the field,
but in our subscription test suite they are somewhat common,
especially in tests that exercise cases that cause quick worker
failure.  That causes the tests to take substantially longer than
they ought to do on typical setups.

To fix (1), make WaitForReplicationWorkerAttach re-set the latch
before returning if it cleared it while looping.  To fix (2), ensure
that we reduce wait_time to no more than wal_retrieve_retry_interval
when logicalrep_worker_launch reports failure.  In passing, fix a
couple of perhaps-hypothetical race conditions, e.g. examining
worker->in_use without a lock.

Backpatch to v16.  Problem (2) didn't exist before commit 5a3a95385
because the previous code always set wait_time to
wal_retrieve_retry_interval when launching a worker, regardless of
success or failure of the launch.  That behavior also greatly
mitigated problem (1), so I'm not excited about adapting the remainder
of the patch to the substantially-different code in older branches.

Author: Tom Lane <tgl@sss.pgh.pa.us>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Discussion: https://postgr.es/m/817604.1750723007@sss.pgh.pa.us
Backpatch-through: 16

src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c

index 27c3a91fb75ea6c769d12ab2d2b737873f9fd51c..294d0d74d8c6b36363e3073e54f0a0ac1d0b4f40 100644 (file)
@@ -184,12 +184,14 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                                                           uint16 generation,
                                                           BackgroundWorkerHandle *handle)
 {
-       BgwHandleStatus status;
-       int                     rc;
+       bool            result = false;
+       bool            dropped_latch = false;
 
        for (;;)
        {
+               BgwHandleStatus status;
                pid_t           pid;
+               int                     rc;
 
                CHECK_FOR_INTERRUPTS();
 
@@ -198,8 +200,9 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                /* Worker either died or has started. Return false if died. */
                if (!worker->in_use || worker->proc)
                {
+                       result = worker->in_use;
                        LWLockRelease(LogicalRepWorkerLock);
-                       return worker->in_use;
+                       break;
                }
 
                LWLockRelease(LogicalRepWorkerLock);
@@ -214,7 +217,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                        if (generation == worker->generation)
                                logicalrep_worker_cleanup(worker);
                        LWLockRelease(LogicalRepWorkerLock);
-                       return false;
+                       break;                          /* result is already false */
                }
 
                /*
@@ -229,8 +232,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                {
                        ResetLatch(MyLatch);
                        CHECK_FOR_INTERRUPTS();
+                       dropped_latch = true;
                }
        }
+
+       /*
+        * If we had to clear a latch event in order to wait, be sure to restore
+        * it before exiting.  Otherwise caller may miss events.
+        */
+       if (dropped_latch)
+               SetLatch(MyLatch);
+
+       return result;
 }
 
 /*
@@ -1197,10 +1210,21 @@ ApplyLauncherMain(Datum main_arg)
                                (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
                        {
                                ApplyLauncherSetWorkerStartTime(sub->oid, now);
-                               logicalrep_worker_launch(WORKERTYPE_APPLY,
-                                                                                sub->dbid, sub->oid, sub->name,
-                                                                                sub->owner, InvalidOid,
-                                                                                DSM_HANDLE_INVALID);
+                               if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
+                                                                                         sub->dbid, sub->oid, sub->name,
+                                                                                         sub->owner, InvalidOid,
+                                                                                         DSM_HANDLE_INVALID))
+                               {
+                                       /*
+                                        * We get here either if we failed to launch a worker
+                                        * (perhaps for resource-exhaustion reasons) or if we
+                                        * launched one but it immediately quit.  Either way, it
+                                        * seems appropriate to try again after
+                                        * wal_retrieve_retry_interval.
+                                        */
+                                       wait_time = Min(wait_time,
+                                                                       wal_retrieve_retry_interval);
+                               }
                        }
                        else
                        {
index b00267f0427086bfe32f4f5c6d9ba627e4cbeb90..c8893ffad965aeb37b9e6078543b905303bf6f8c 100644 (file)
@@ -604,14 +604,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                                                TimestampDifferenceExceeds(hentry->last_start_time, now,
                                                                                                   wal_retrieve_retry_interval))
                                        {
-                                               logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
-                                                                                                MyLogicalRepWorker->dbid,
-                                                                                                MySubscription->oid,
-                                                                                                MySubscription->name,
-                                                                                                MyLogicalRepWorker->userid,
-                                                                                                rstate->relid,
-                                                                                                DSM_HANDLE_INVALID);
+                                               /*
+                                                * Set the last_start_time even if we fail to start
+                                                * the worker, so that we won't retry until
+                                                * wal_retrieve_retry_interval has elapsed.
+                                                */
                                                hentry->last_start_time = now;
+                                               (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+                                                                                                               MyLogicalRepWorker->dbid,
+                                                                                                               MySubscription->oid,
+                                                                                                               MySubscription->name,
+                                                                                                               MyLogicalRepWorker->userid,
+                                                                                                               rstate->relid,
+                                                                                                               DSM_HANDLE_INVALID);
                                        }
                                }
                        }