#endif
/* State for IO worker management. */
+static TimestampTz io_worker_launch_next_time = 0;
static int io_worker_count = 0;
static PMChild *io_worker_children[MAX_IO_WORKERS];
static void LaunchMissingBackgroundProcesses(void);
static void maybe_start_bgworkers(void);
static bool maybe_reap_io_worker(int pid);
-static void maybe_adjust_io_workers(void);
+static void maybe_start_io_workers(void);
+static TimestampTz maybe_start_io_workers_scheduled_at(void);
static bool CreateOptsFile(int argc, char *argv[], char *fullprogname);
static PMChild *StartChildProcess(BackendType type);
static void StartSysLogger(void);
UpdatePMState(PM_STARTUP);
/* Make sure we can perform I/O while starting up. */
- maybe_adjust_io_workers();
+ maybe_start_io_workers();
/* Start bgwriter and checkpointer so they can help with recovery */
if (CheckpointerPMChild == NULL)
static int
DetermineSleepTime(void)
{
- TimestampTz next_wakeup = 0;
+ TimestampTz next_wakeup;
/*
- * Normal case: either there are no background workers at all, or we're in
- * a shutdown sequence (during which we ignore bgworkers altogether).
+ * If in ImmediateShutdown with a SIGKILL timeout, ignore everything else
+ * and wait for that.
+ *
+ * XXX Shouldn't this also test FatalError?
*/
- if (Shutdown > NoShutdown ||
- (!StartWorkerNeeded && !HaveCrashedWorker))
+ if (Shutdown >= ImmediateShutdown)
{
if (AbortStartTime != 0)
{
return seconds * 1000;
}
- else
- return 60 * 1000;
}
- if (StartWorkerNeeded)
+ /* Time of next maybe_start_io_workers() call, or 0 for none. */
+ next_wakeup = maybe_start_io_workers_scheduled_at();
+
+ /* Ignore bgworkers during shutdown. */
+ if (StartWorkerNeeded && Shutdown == NoShutdown)
return 0;
- if (HaveCrashedWorker)
+ if (HaveCrashedWorker && Shutdown == NoShutdown)
{
dlist_mutable_iter iter;
if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
HandleChildCrash(pid, exitstatus, _("io worker"));
- maybe_adjust_io_workers();
+ /*
+ * A worker that exited with an error might have brought the pool
+ * size below io_min_workers, or allowed the queue to grow to the
+ * point where another worker called for growth.
+ *
+ * In the common case that a worker timed out due to idleness, no
+ * replacement needs to be started. maybe_start_io_workers() will
+ * figure that out.
+ */
+ maybe_start_io_workers();
+
continue;
}
UpdatePMState(PM_STARTUP);
/* Make sure we can perform I/O while starting up. */
- maybe_adjust_io_workers();
+ maybe_start_io_workers();
StartupPMChild = StartChildProcess(B_STARTUP);
Assert(StartupPMChild != NULL);
* A config file change will always lead to this function being called, so
* we always will process the config change in a timely manner.
*/
- maybe_adjust_io_workers();
+ maybe_start_io_workers();
/*
* The checkpointer and the background writer are active from the start,
StartWorkerNeeded = true;
}
+ /* Process IO worker start requests. */
+ if (CheckPostmasterSignal(PMSIGNAL_IO_WORKER_GROW))
+ {
+ /*
+ * No local flag, as the state is exposed through pgaio_worker_*()
+ * functions. This signal is received on potentially actionable level
+ * changes, so that maybe_start_io_workers() will run.
+ */
+ }
+
/* Process background worker state changes. */
if (CheckPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE))
{
}
/*
- * Start or stop IO workers, to close the gap between the number of running
- * workers and the number of configured workers. Used to respond to change of
- * the io_workers GUC (by increasing and decreasing the number of workers), as
- * well as workers terminating in response to errors (by starting
- * "replacement" workers).
+ * Returns the next time at which maybe_start_io_workers() would start one or
+ * more I/O workers. Any time in the past means ASAP, and 0 means no worker
+ * is currently scheduled.
+ *
+ * This is called by DetermineSleepTime() and also maybe_start_io_workers()
+ * itself, to make sure that they agree.
*/
-static void
-maybe_adjust_io_workers(void)
+static TimestampTz
+maybe_start_io_workers_scheduled_at(void)
{
if (!pgaio_workers_enabled())
- return;
+ return 0;
/*
* If we're in final shutting down state, then we're just waiting for all
* processes to exit.
*/
if (pmState >= PM_WAIT_IO_WORKERS)
- return;
+ return 0;
/* Don't start new workers during an immediate shutdown either. */
if (Shutdown >= ImmediateShutdown)
- return;
+ return 0;
/*
* Don't start new workers if we're in the shutdown phase of a crash
* restart. But we *do* need to start if we're already starting up again.
*/
if (FatalError && pmState >= PM_STOP_BACKENDS)
- return;
+ return 0;
+
+ /*
+ * Don't start a worker if we're at or above the maximum. (Excess workers
+ * exit when the GUC is lowered, but the count can be temporarily too high
+ * until they are reaped.)
+ */
+ if (io_worker_count >= io_max_workers)
+ return 0;
+
+ /* If we're under the minimum, start a worker as soon as possible. */
+ if (io_worker_count < io_min_workers)
+ return TIMESTAMP_MINUS_INFINITY; /* start worker ASAP */
+
+ /* Only proceed if a "grow" signal has been received from a worker. */
+ if (!pgaio_worker_pm_test_grow_signal_sent())
+ return 0;
- Assert(pmState < PM_WAIT_IO_WORKERS);
+ /*
+ * maybe_start_io_workers() should start a new I/O worker after this time,
+ * or as soon as possible if is already in the past.
+ */
+ return io_worker_launch_next_time;
+}
- /* Not enough running? */
- while (io_worker_count < io_workers)
+/*
+ * Start I/O workers if required. Used at startup, to respond to change of
+ * the io_min_workers GUC, when asked to start a new one due to submission
+ * queue backlog, and after workers terminate in response to errors (by
+ * starting "replacement" workers).
+ */
+static void
+maybe_start_io_workers(void)
+{
+ TimestampTz scheduled_at;
+
+ while ((scheduled_at = maybe_start_io_workers_scheduled_at()) != 0)
{
+ TimestampTz now = GetCurrentTimestamp();
PMChild *child;
int i;
+ Assert(pmState < PM_WAIT_IO_WORKERS);
+
+ /* Still waiting for the scheduled time? */
+ if (scheduled_at > now)
+ break;
+
+ /*
+ * Compute next launch time relative to the previous value, so that
+ * time spent on the postmaster's other duties don't result in an
+ * inaccurate launch interval.
+ */
+ io_worker_launch_next_time =
+ TimestampTzPlusMilliseconds(io_worker_launch_next_time,
+ io_worker_launch_interval);
+
+ /*
+ * If that's already in the past, the interval is either impossibly
+ * short or we received no requests for new workers for a period.
+ * Compute a new future time relative to now instead.
+ */
+ if (io_worker_launch_next_time <= now)
+ io_worker_launch_next_time =
+ TimestampTzPlusMilliseconds(now, io_worker_launch_interval);
+
+ /*
+ * Check if a grow signal has been received, but the grow request has
+ * been canceled since then because work ran out. We've still
+ * advanced the next launch time, to suppress repeat signals from
+ * workers until then.
+ */
+ if (io_worker_count >= io_min_workers && !pgaio_worker_pm_test_grow())
+ {
+ pgaio_worker_pm_clear_grow_signal_sent();
+ break;
+ }
+
/* find unused entry in io_worker_children array */
for (i = 0; i < MAX_IO_WORKERS; ++i)
{
++io_worker_count;
}
else
- break; /* try again next time */
- }
-
- /* Too many running? */
- if (io_worker_count > io_workers)
- {
- /* ask the IO worker in the highest slot to exit */
- for (int i = MAX_IO_WORKERS - 1; i >= 0; --i)
{
- if (io_worker_children[i] != NULL)
- {
- kill(io_worker_children[i]->pid, SIGUSR2);
- break;
- }
+ /*
+ * Fork failure: we'll try again after the launch interval
+ * expires, or be called again without delay if we don't yet have
+ * io_min_workers. Don't loop here though, the postmaster has
+ * other duties.
+ */
+ break;
}
}
+
+ /*
+ * Workers decide when to shut down by themselves, according to the
+ * io_max_workers and io_worker_idle_timeout GUCs.
+ */
}
* infrastructure for reopening the file, and must processed synchronously by
* the client code when submitted.
*
- * So that the submitter can make just one system call when submitting a batch
- * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
- * could be improved by using futexes instead of latches to wake N waiters.
+ * The pool of workers tries to stabilize at a size that can handle recently
+ * seen variation in demand, within the configured limits.
*
* This method of AIO is available in all builds on all operating systems, and
* is the default.
#include "postgres.h"
+#include <limits.h>
+
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
+#include "storage/lwlock.h"
+#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "tcop/tcopprot.h"
#include "utils/ps_status.h"
#include "utils/wait_event.h"
+/*
+ * Saturation for counters used to estimate wakeup:IO ratio.
+ *
+ * We maintain hist_wakeups for wakeups received and hist_ios for IOs
+ * processed by each worker. When either counter reaches this saturation
+ * value, we divide both by two. The result is an exponentially decaying
+ * ratio of wakeups to IOs, with a very short memory.
+ *
+ * If a worker is itself experiencing useless wakeups, it assumes that
+ * higher-numbered workers would experience even more, so it should end the
+ * chain.
+ */
+#define PGAIO_WORKER_WAKEUP_RATIO_SATURATE 4
-/* How many workers should each worker wake up if needed? */
-#define IO_WORKER_WAKEUP_FANOUT 2
-
+/* Debugging support: show current IO and wakeups:ios statistics in ps. */
+/* #define PGAIO_WORKER_SHOW_PS_INFO */
typedef struct PgAioWorkerSubmissionQueue
{
typedef struct PgAioWorkerSlot
{
- Latch *latch;
- bool in_use;
+ ProcNumber proc_number;
} PgAioWorkerSlot;
+/*
+ * Sets of worker IDs are held in a simple bitmap, accessed through functions
+ * that provide a more readable abstraction. If we wanted to support more
+ * workers than that, the contention on the single queue would surely get too
+ * high, so we might want to consider multiple pools instead of widening this.
+ */
+typedef uint64 PgAioWorkerSet;
+
+#define PGAIO_WORKERSET_BITS (sizeof(PgAioWorkerSet) * CHAR_BIT)
+
+static_assert(PGAIO_WORKERSET_BITS >= MAX_IO_WORKERS, "too small");
+
typedef struct PgAioWorkerControl
{
- uint64 idle_worker_mask;
+ /* Seen by postmaster */
+ bool grow;
+ bool grow_signal_sent;
+
+ /* Protected by AioWorkerSubmissionQueueLock. */
+ PgAioWorkerSet idle_workerset;
+
+ /* Protected by AioWorkerControlLock. */
+ PgAioWorkerSet workerset;
+ int nworkers;
+
+ /* Protected by AioWorkerControlLock. */
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
} PgAioWorkerControl;
/* GUCs */
-int io_workers = 3;
+int io_min_workers = 2;
+int io_max_workers = 8;
+int io_worker_idle_timeout = 60000;
+int io_worker_launch_interval = 100;
static int io_worker_queue_size = 64;
-static int MyIoWorkerId;
+static int MyIoWorkerId = -1;
static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
static PgAioWorkerControl *io_worker_control;
+static void
+pgaio_workerset_initialize(PgAioWorkerSet *set)
+{
+ *set = 0;
+}
+
+static bool
+pgaio_workerset_is_empty(PgAioWorkerSet *set)
+{
+ return *set == 0;
+}
+
+static PgAioWorkerSet
+pgaio_workerset_singleton(int worker)
+{
+ Assert(worker >= 0 && worker < MAX_IO_WORKERS);
+ return UINT64_C(1) << worker;
+}
+
+static void
+pgaio_workerset_all(PgAioWorkerSet *set)
+{
+ *set = UINT64_MAX >> (PGAIO_WORKERSET_BITS - MAX_IO_WORKERS);
+}
+
+static void
+pgaio_workerset_subtract(PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
+{
+ *set1 &= ~*set2;
+}
+
+static void
+pgaio_workerset_insert(PgAioWorkerSet *set, int worker)
+{
+ Assert(worker >= 0 && worker < MAX_IO_WORKERS);
+ *set |= pgaio_workerset_singleton(worker);
+}
+
+static void
+pgaio_workerset_remove(PgAioWorkerSet *set, int worker)
+{
+ Assert(worker >= 0 && worker < MAX_IO_WORKERS);
+ *set &= ~pgaio_workerset_singleton(worker);
+}
+
+static void
+pgaio_workerset_remove_lte(PgAioWorkerSet *set, int worker)
+{
+ Assert(worker >= 0 && worker < MAX_IO_WORKERS);
+ *set &= (~(PgAioWorkerSet) 0) << (worker + 1);
+}
+
+static int
+pgaio_workerset_get_highest(PgAioWorkerSet *set)
+{
+ Assert(!pgaio_workerset_is_empty(set));
+ return pg_leftmost_one_pos64(*set);
+}
+
+static int
+pgaio_workerset_get_lowest(PgAioWorkerSet *set)
+{
+ Assert(!pgaio_workerset_is_empty(set));
+ return pg_rightmost_one_pos64(*set);
+}
+
+static int
+pgaio_workerset_pop_lowest(PgAioWorkerSet *set)
+{
+ int worker = pgaio_workerset_get_lowest(set);
+
+ pgaio_workerset_remove(set, worker);
+ return worker;
+}
+
+#ifdef USE_ASSERT_CHECKING
+static bool
+pgaio_workerset_contains(PgAioWorkerSet *set, int worker)
+{
+ Assert(worker >= 0 && worker < MAX_IO_WORKERS);
+ return (*set & pgaio_workerset_singleton(worker)) != 0;
+}
+
+static int
+pgaio_workerset_count(PgAioWorkerSet *set)
+{
+ return pg_popcount64(*set);
+}
+#endif
+
static void
pgaio_worker_shmem_request(void *arg)
{
io_worker_submission_queue->size = queue_size;
io_worker_submission_queue->head = 0;
io_worker_submission_queue->tail = 0;
+ io_worker_control->grow = false;
+ pgaio_workerset_initialize(&io_worker_control->workerset);
+ pgaio_workerset_initialize(&io_worker_control->idle_workerset);
- io_worker_control->idle_worker_mask = 0;
for (int i = 0; i < MAX_IO_WORKERS; ++i)
- {
- io_worker_control->workers[i].latch = NULL;
- io_worker_control->workers[i].in_use = false;
- }
+ io_worker_control->workers[i].proc_number = INVALID_PROC_NUMBER;
+}
+
+/*
+ * Tell postmaster that we think a new worker is needed.
+ */
+static void
+pgaio_worker_request_grow(void)
+{
+ /*
+ * Suppress useless signaling if we already know that we're at the
+ * maximum. This uses an unlocked read of nworkers, but that's OK for
+ * this heuristic purpose.
+ */
+ if (io_worker_control->nworkers >= io_max_workers)
+ return;
+
+ /* Already requested? */
+ if (io_worker_control->grow)
+ return;
+
+ io_worker_control->grow = true;
+ pg_memory_barrier();
+
+ /*
+ * If the postmaster has already been signaled, don't do it again until
+ * the postmaster clears this flag. There is no point in repeated signals
+ * if grow is being set and cleared repeatedly while the postmaster is
+ * waiting for io_worker_launch_interval, which it applies even to
+ * canceled requests.
+ */
+ if (io_worker_control->grow_signal_sent)
+ return;
+
+ io_worker_control->grow_signal_sent = true;
+ pg_memory_barrier();
+ SendPostmasterSignal(PMSIGNAL_IO_WORKER_GROW);
+}
+
+/*
+ * Cancel any request for a new worker, after observing an empty queue.
+ */
+static void
+pgaio_worker_cancel_grow(void)
+{
+ if (!io_worker_control->grow)
+ return;
+
+ io_worker_control->grow = false;
+ pg_memory_barrier();
+}
+
+/*
+ * Called by the postmaster to check if a new worker has been requested (but
+ * possibly canceled since).
+ */
+bool
+pgaio_worker_pm_test_grow_signal_sent(void)
+{
+ pg_memory_barrier();
+ return io_worker_control && io_worker_control->grow_signal_sent;
+}
+
+/*
+ * Called by the postmaster to check if a new worker has been requested and
+ * not canceled since.
+ */
+bool
+pgaio_worker_pm_test_grow(void)
+{
+ pg_memory_barrier();
+ return io_worker_control && io_worker_control->grow;
+}
+
+/*
+ * Called by the postmaster to clear the request for a new worker.
+ */
+void
+pgaio_worker_pm_clear_grow_signal_sent(void)
+{
+ if (!io_worker_control)
+ return;
+
+ io_worker_control->grow = false;
+ io_worker_control->grow_signal_sent = false;
+ pg_memory_barrier();
}
static int
-pgaio_worker_choose_idle(void)
+pgaio_worker_choose_idle(int only_workers_above)
{
+ PgAioWorkerSet workerset;
int worker;
- if (io_worker_control->idle_worker_mask == 0)
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
+ workerset = io_worker_control->idle_workerset;
+ if (only_workers_above >= 0)
+ pgaio_workerset_remove_lte(&workerset, only_workers_above);
+ if (pgaio_workerset_is_empty(&workerset))
return -1;
- /* Find the lowest bit position, and clear it. */
- worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
- Assert(io_worker_control->workers[worker].in_use);
+ /* Find the lowest numbered idle worker and mark it not idle. */
+ worker = pgaio_workerset_get_lowest(&workerset);
+ pgaio_workerset_remove(&io_worker_control->idle_workerset, worker);
return worker;
}
+/*
+ * Try to wake a worker by setting its latch, to tell it there are IOs to
+ * process in the submission queue.
+ */
+static void
+pgaio_worker_wake(int worker)
+{
+ ProcNumber proc_number;
+
+ /*
+ * If the selected worker is concurrently exiting, then pgaio_worker_die()
+ * had not yet removed it as of when we saw it in idle_workerset. That's
+ * OK, because it will wake all remaining workers to close wakeup-vs-exit
+ * races: *someone* will see the queued IO. If there are no workers
+ * running, the postmaster will start a new one.
+ */
+ proc_number = io_worker_control->workers[worker].proc_number;
+ if (proc_number != INVALID_PROC_NUMBER)
+ SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
+}
+
+/*
+ * Try to wake a set of workers. Used on pool change, to close races
+ * described in the callers.
+ */
+static void
+pgaio_workerset_wake(PgAioWorkerSet workerset)
+{
+ while (!pgaio_workerset_is_empty(&workerset))
+ pgaio_worker_wake(pgaio_workerset_pop_lowest(&workerset));
+}
+
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
new_head = (queue->head + 1) & (queue->size - 1);
if (new_head == queue->tail)
PgAioWorkerSubmissionQueue *queue;
int result;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return -1; /* empty */
uint32 head;
uint32 tail;
+ Assert(LWLockHeldByMeInMode(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE));
+
head = io_worker_submission_queue->head;
tail = io_worker_submission_queue->tail;
{
PgAioHandle **synchronous_ios = NULL;
int nsync = 0;
- Latch *wakeup = NULL;
- int worker;
+ int worker = -1;
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
break;
}
-
- if (wakeup == NULL)
- {
- /* Choose an idle worker to wake up if we haven't already. */
- worker = pgaio_worker_choose_idle();
- if (worker >= 0)
- wakeup = io_worker_control->workers[worker].latch;
-
- pgaio_debug_io(DEBUG4, staged_ios[i],
- "choosing worker %d",
- worker);
- }
}
+ /* Choose one worker to wake for this batch. */
+ worker = pgaio_worker_choose_idle(-1);
LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ /* Wake up chosen worker. It will wake peers if necessary. */
+ if (worker != -1)
+ pgaio_worker_wake(worker);
}
else
{
nsync = num_staged_ios;
}
- if (wakeup)
- SetLatch(wakeup);
-
/* Run whatever is left synchronously. */
if (nsync > 0)
{
static void
pgaio_worker_die(int code, Datum arg)
{
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- Assert(io_worker_control->workers[MyIoWorkerId].in_use);
- Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+ PgAioWorkerSet notify_set;
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].in_use = false;
- io_worker_control->workers[MyIoWorkerId].latch = NULL;
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ pgaio_workerset_remove(&io_worker_control->idle_workerset, MyIoWorkerId);
LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number == MyProcNumber);
+ io_worker_control->workers[MyIoWorkerId].proc_number = INVALID_PROC_NUMBER;
+ Assert(pgaio_workerset_contains(&io_worker_control->workerset, MyIoWorkerId));
+ pgaio_workerset_remove(&io_worker_control->workerset, MyIoWorkerId);
+ notify_set = io_worker_control->workerset;
+ Assert(io_worker_control->nworkers > 0);
+ io_worker_control->nworkers--;
+ Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
+
+ /*
+ * Notify other workers on pool change. This allows the new highest
+ * worker to know that it is now the one that can time out, and closes a
+ * wakeup-loss race described in pgaio_worker_wake().
+ */
+ pgaio_workerset_wake(notify_set);
}
/*
static void
pgaio_worker_register(void)
{
+ PgAioWorkerSet free_workerset;
+ PgAioWorkerSet old_workerset;
+
MyIoWorkerId = -1;
- /*
- * XXX: This could do with more fine-grained locking. But it's also not
- * very common for the number of workers to change at the moment...
- */
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ LWLockAcquire(AioWorkerControlLock, LW_EXCLUSIVE);
+ /* Find lowest unused worker ID. */
+ pgaio_workerset_all(&free_workerset);
+ pgaio_workerset_subtract(&free_workerset, &io_worker_control->workerset);
+ if (!pgaio_workerset_is_empty(&free_workerset))
+ MyIoWorkerId = pgaio_workerset_get_lowest(&free_workerset);
+ if (MyIoWorkerId == -1)
+ elog(ERROR, "couldn't find a free worker ID");
- for (int i = 0; i < MAX_IO_WORKERS; ++i)
- {
- if (!io_worker_control->workers[i].in_use)
- {
- Assert(io_worker_control->workers[i].latch == NULL);
- io_worker_control->workers[i].in_use = true;
- MyIoWorkerId = i;
- break;
- }
- else
- Assert(io_worker_control->workers[i].latch != NULL);
- }
+ Assert(io_worker_control->workers[MyIoWorkerId].proc_number ==
+ INVALID_PROC_NUMBER);
+ io_worker_control->workers[MyIoWorkerId].proc_number = MyProcNumber;
- if (MyIoWorkerId == -1)
- elog(ERROR, "couldn't find a free worker slot");
+ old_workerset = io_worker_control->workerset;
+ Assert(!pgaio_workerset_contains(&old_workerset, MyIoWorkerId));
+ pgaio_workerset_insert(&io_worker_control->workerset, MyIoWorkerId);
+ io_worker_control->nworkers++;
+ Assert(io_worker_control->nworkers <= MAX_IO_WORKERS);
+ Assert(pgaio_workerset_count(&io_worker_control->workerset) ==
+ io_worker_control->nworkers);
+ LWLockRelease(AioWorkerControlLock);
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
- io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
- LWLockRelease(AioWorkerSubmissionQueueLock);
+ /*
+ * Notify other workers on pool change. If we were the highest worker,
+ * this allows the new highest worker to know that it can time out.
+ */
+ pgaio_workerset_wake(old_workerset);
on_shmem_exit(pgaio_worker_die, 0);
}
errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
}
+/*
+ * Check if this backend is allowed to time out, and thus should use a
+ * non-infinite sleep time. Only the highest-numbered worker is allowed to
+ * time out, and only if the pool is above io_min_workers. Serializing
+ * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
+ * io_min_workers.
+ *
+ * The result is only instantaneously true and may be temporarily inconsistent
+ * in different workers around transitions, but all workers are woken up on
+ * pool size or GUC changes making the result eventually consistent.
+ */
+static bool
+pgaio_worker_can_timeout(void)
+{
+ PgAioWorkerSet workerset;
+
+ if (MyIoWorkerId < io_min_workers)
+ return false;
+
+ /* Serialize against pool size changes. */
+ LWLockAcquire(AioWorkerControlLock, LW_SHARED);
+ workerset = io_worker_control->workerset;
+ LWLockRelease(AioWorkerControlLock);
+
+ if (MyIoWorkerId != pgaio_workerset_get_highest(&workerset))
+ return false;
+
+ return true;
+}
+
void
IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
sigjmp_buf local_sigjmp_buf;
+ TimestampTz idle_timeout_abs = 0;
+ int timeout_guc_used = 0;
PgAioHandle *volatile error_ioh = NULL;
ErrorContextCallback errcallback = {0};
volatile int error_errno = 0;
char cmd[128];
+ int hist_ios = 0;
+ int hist_wakeups = 0;
AuxiliaryProcessMainCommon();
while (!ShutdownRequestPending)
{
uint32 io_index;
- Latch *latches[IO_WORKER_WAKEUP_FANOUT];
- int nlatches = 0;
- int nwakeups = 0;
- int worker;
+ int worker = -1;
+ int queue_depth = 0;
+ bool maybe_grow = false;
/*
* Try to get a job to do.
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
{
- /*
- * Nothing to do. Mark self idle.
- *
- * XXX: Invent some kind of back pressure to reduce useless
- * wakeups?
- */
- io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+ /* Nothing to do. Mark self idle. */
+ pgaio_workerset_insert(&io_worker_control->idle_workerset,
+ MyIoWorkerId);
}
else
{
/* Got one. Clear idle flag. */
- io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+ pgaio_workerset_remove(&io_worker_control->idle_workerset,
+ MyIoWorkerId);
- /* See if we can wake up some peers. */
- nwakeups = Min(pgaio_worker_submission_queue_depth(),
- IO_WORKER_WAKEUP_FANOUT);
- for (int i = 0; i < nwakeups; ++i)
+ /*
+ * See if we should wake up a higher numbered peer. Only do that
+ * if this worker is not receiving spurious wakeups itself. The
+ * intention is create a frontier beyond which idle workers stay
+ * asleep.
+ *
+ * This heuristic tries to discover the useful wakeup propagation
+ * chain length when IOs are very fast and workers wake up to find
+ * that all IOs have already been taken.
+ *
+ * If we chose not to wake a worker when we ideally should have,
+ * then the ratio will soon change to correct that.
+ */
+ if (hist_wakeups <= hist_ios)
{
- if ((worker = pgaio_worker_choose_idle()) < 0)
- break;
- latches[nlatches++] = io_worker_control->workers[worker].latch;
+ queue_depth = pgaio_worker_submission_queue_depth();
+ if (queue_depth > 0)
+ {
+ /* Choose a worker higher than me to wake. */
+ worker = pgaio_worker_choose_idle(MyIoWorkerId);
+ if (worker == -1)
+ maybe_grow = true;
+ }
}
}
LWLockRelease(AioWorkerSubmissionQueueLock);
- for (int i = 0; i < nlatches; ++i)
- SetLatch(latches[i]);
+ /* Propagate wakeups. */
+ if (worker != -1)
+ {
+ pgaio_worker_wake(worker);
+ }
+ else if (maybe_grow)
+ {
+ /*
+ * We know there was at least one more item in the queue, and we
+ * failed to find a higher-numbered idle worker to wake. Now we
+ * decide if we should try to start one more worker.
+ *
+ * We do this with a simple heuristic: is the queue depth greater
+ * than the current number of workers?
+ *
+ * Consider the following situations:
+ *
+ * 1. The queue depth is constantly increasing, because IOs are
+ * arriving faster than they can possibly be serviced. It doesn't
+ * matter much which threshold we choose, as we will surely hit
+ * it. Crossing the current worker count is a useful signal
+ * because it's clearly too deep to avoid queuing latency already,
+ * but still leaves a small window of opportunity to improve the
+ * situation before the queue oveflows.
+ *
+ * 2. The worker pool is keeping up, no latency is being
+ * introduced and an extra worker would be a waste of resources.
+ * Queue depth distributions tend to be heavily skewed, with long
+ * tails of low probability spikes (due to submission clustering,
+ * scheduling, jitter, stalls, noisy neighbors, etc). We want a
+ * number that is very unlikely to be triggered by an outlier, and
+ * we bet that an exponential or similar distribution whose
+ * outliers never reach this threshold must be almost entirely
+ * concentrated at the low end. If we do see a spike as big as
+ * the worker count, we take it as a signal that the distribution
+ * is surely too wide.
+ *
+ * On its own, this is an extremely crude signal. When combined
+ * with the wakeup propagation test that precedes it (but on its
+ * own tends to overshoot) and io_worker_launch_delay, the result
+ * is that we gradually test each pool size until we find one that
+ * doesn't trigger further expansion, and then hold it for at
+ * least io_worker_idle_timeout.
+ *
+ * XXX Perhaps ideas from queueing theory or control theory could
+ * do a better job of this.
+ */
+
+ /* Read nworkers without lock for this heuristic purpose. */
+ if (queue_depth > io_worker_control->nworkers)
+ pgaio_worker_request_grow();
+ }
if (io_index != -1)
{
PgAioHandle *ioh = NULL;
+ /* Cancel timeout and update wakeup:work ratio. */
+ idle_timeout_abs = 0;
+ if (++hist_ios == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
+ {
+ hist_wakeups /= 2;
+ hist_ios /= 2;
+ }
+
ioh = &pgaio_ctl->io_handles[io_index];
error_ioh = ioh;
errcallback.arg = ioh;
}
#endif
+#ifdef PGAIO_WORKER_SHOW_PS_INFO
+ {
+ char *description = pgaio_io_get_target_description(ioh);
+
+ sprintf(cmd, "%d: [%s] %s",
+ MyIoWorkerId,
+ pgaio_io_get_op_name(ioh),
+ description);
+ pfree(description);
+ set_ps_display(cmd);
+ }
+#endif
+
/*
* We don't expect this to ever fail with ERROR or FATAL, no need
* to keep error_ioh set to the IO.
}
else
{
- WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
- WAIT_EVENT_IO_WORKER_MAIN);
+ int timeout_ms;
+
+ /* Cancel new worker request if pending. */
+ pgaio_worker_cancel_grow();
+
+ /* Compute the remaining allowed idle time. */
+ if (io_worker_idle_timeout == -1)
+ {
+ /* Never time out. */
+ timeout_ms = -1;
+ }
+ else
+ {
+ TimestampTz now = GetCurrentTimestamp();
+
+ /* If the GUC changes, reset timer. */
+ if (idle_timeout_abs != 0 &&
+ io_worker_idle_timeout != timeout_guc_used)
+ idle_timeout_abs = 0;
+
+ /* Only the highest-numbered worker can time out. */
+ if (pgaio_worker_can_timeout())
+ {
+ if (idle_timeout_abs == 0)
+ {
+ /*
+ * I have just been promoted to the timeout worker, or
+ * the GUC changed. Compute new absolute time from
+ * now.
+ */
+ idle_timeout_abs =
+ TimestampTzPlusMilliseconds(now,
+ io_worker_idle_timeout);
+ timeout_guc_used = io_worker_idle_timeout;
+ }
+ timeout_ms =
+ TimestampDifferenceMilliseconds(now, idle_timeout_abs);
+ }
+ else
+ {
+ /* No timeout for me. */
+ idle_timeout_abs = 0;
+ timeout_ms = -1;
+ }
+ }
+
+#ifdef PGAIO_WORKER_SHOW_PS_INFO
+ sprintf(cmd, "%d: idle, wakeups:ios = %d:%d",
+ MyIoWorkerId, hist_wakeups, hist_ios);
+ set_ps_display(cmd);
+#endif
+
+ if (WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT,
+ timeout_ms,
+ WAIT_EVENT_IO_WORKER_MAIN) == WL_TIMEOUT)
+ {
+ /* WL_TIMEOUT */
+ if (pgaio_worker_can_timeout())
+ if (GetCurrentTimestamp() >= idle_timeout_abs)
+ break;
+ }
+ else
+ {
+ /* WL_LATCH_SET */
+ if (++hist_wakeups == PGAIO_WORKER_WAKEUP_RATIO_SATURATE)
+ {
+ hist_wakeups /= 2;
+ hist_ios /= 2;
+ }
+ }
ResetLatch(MyLatch);
}
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ /* If io_max_workers has been decreased, exit highest first. */
+ if (MyIoWorkerId >= io_max_workers)
+ break;
}
}