static void
pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
{
- PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+ PgAioHandle **synchronous_ios = NULL;
int nsync = 0;
Latch *wakeup = NULL;
int worker;
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
- LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- for (int i = 0; i < num_staged_ios; ++i)
+ if (LWLockConditionalAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE))
{
- Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
- if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
+ for (int i = 0; i < num_staged_ios; ++i)
{
- /*
- * We'll do it synchronously, but only after we've sent as many as
- * we can to workers, to maximize concurrency.
- */
- synchronous_ios[nsync++] = staged_ios[i];
- continue;
- }
+ Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
+ if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
+ {
+ /*
+ * Do the rest synchronously. If the queue is full, give up and
+ * do the rest synchronously. We're holding an exclusive lock
+ * on the queue so nothing can consume entries.
+ */
+ synchronous_ios = &staged_ios[i];
+ nsync = (num_staged_ios - i);
+
+ break;
+ }
- if (wakeup == NULL)
- {
- /* Choose an idle worker to wake up if we haven't already. */
- worker = pgaio_worker_choose_idle();
- if (worker >= 0)
- wakeup = io_worker_control->workers[worker].latch;
-
- pgaio_debug_io(DEBUG4, staged_ios[i],
- "choosing worker %d",
- worker);
+ if (wakeup == NULL)
+ {
+ /* Choose an idle worker to wake up if we haven't already. */
+ worker = pgaio_worker_choose_idle();
+ if (worker >= 0)
+ wakeup = io_worker_control->workers[worker].latch;
+
+ pgaio_debug_io(DEBUG4, staged_ios[i],
+ "choosing worker %d",
+ worker);
+ }
}
+ LWLockRelease(AioWorkerSubmissionQueueLock);
+ }
+ else
+ {
+ /* do everything synchronously, no wakeup needed */
+ synchronous_ios = staged_ios;
+ nsync = num_staged_ios;
}
- LWLockRelease(AioWorkerSubmissionQueueLock);
if (wakeup)
SetLatch(wakeup);