#define IO_WORKER_WAKEUP_FANOUT 2
-typedef struct AioWorkerSubmissionQueue
+typedef struct PgAioWorkerSubmissionQueue
{
uint32 size;
uint32 mask;
uint32 head;
uint32 tail;
- uint32 ios[FLEXIBLE_ARRAY_MEMBER];
-} AioWorkerSubmissionQueue;
+ uint32 sqes[FLEXIBLE_ARRAY_MEMBER];
+} PgAioWorkerSubmissionQueue;
-typedef struct AioWorkerSlot
+typedef struct PgAioWorkerSlot
{
Latch *latch;
bool in_use;
-} AioWorkerSlot;
+} PgAioWorkerSlot;
-typedef struct AioWorkerControl
+typedef struct PgAioWorkerControl
{
uint64 idle_worker_mask;
- AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
-} AioWorkerControl;
+ PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
+} PgAioWorkerControl;
static size_t pgaio_worker_shmem_size(void);
static int io_worker_queue_size = 64;
static int MyIoWorkerId;
-static AioWorkerSubmissionQueue *io_worker_submission_queue;
-static AioWorkerControl *io_worker_control;
+static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
+static PgAioWorkerControl *io_worker_control;
static size_t
/* Round size up to next power of two so we can make a mask. */
*queue_size = pg_nextpower2_32(io_worker_queue_size);
- return offsetof(AioWorkerSubmissionQueue, ios) +
+ return offsetof(PgAioWorkerSubmissionQueue, sqes) +
sizeof(uint32) * *queue_size;
}
static size_t
pgaio_worker_control_shmem_size(void)
{
- return offsetof(AioWorkerControl, workers) +
- sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
+ return offsetof(PgAioWorkerControl, workers) +
+ sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
}
static size_t
}
static int
-pgaio_choose_idle_worker(void)
+pgaio_worker_choose_idle(void)
{
int worker;
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
- AioWorkerSubmissionQueue *queue;
+ PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
queue = io_worker_submission_queue;
return false; /* full */
}
- queue->ios[queue->head] = pgaio_io_get_id(ioh);
+ queue->sqes[queue->head] = pgaio_io_get_id(ioh);
queue->head = new_head;
return true;
static uint32
pgaio_worker_submission_queue_consume(void)
{
- AioWorkerSubmissionQueue *queue;
+ PgAioWorkerSubmissionQueue *queue;
uint32 result;
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return UINT32_MAX; /* empty */
- result = queue->ios[queue->tail];
+ result = queue->sqes[queue->tail];
queue->tail = (queue->tail + 1) & (queue->size - 1);
return result;
}
static void
-pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
+pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
int nsync = 0;
Latch *wakeup = NULL;
int worker;
- Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
+ Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- for (int i = 0; i < nios; ++i)
+ for (int i = 0; i < num_staged_ios; ++i)
{
- Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
- if (!pgaio_worker_submission_queue_insert(ios[i]))
+ Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
+ if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
{
/*
* We'll do it synchronously, but only after we've sent as many as
* we can to workers, to maximize concurrency.
*/
- synchronous_ios[nsync++] = ios[i];
+ synchronous_ios[nsync++] = staged_ios[i];
continue;
}
if (wakeup == NULL)
{
/* Choose an idle worker to wake up if we haven't already. */
- worker = pgaio_choose_idle_worker();
+ worker = pgaio_worker_choose_idle();
if (worker >= 0)
wakeup = io_worker_control->workers[worker].latch;
- pgaio_debug_io(DEBUG4, ios[i],
+ pgaio_debug_io(DEBUG4, staged_ios[i],
"choosing worker %d",
worker);
}
IO_WORKER_WAKEUP_FANOUT);
for (int i = 0; i < nwakeups; ++i)
{
- if ((worker = pgaio_choose_idle_worker()) < 0)
+ if ((worker = pgaio_worker_choose_idle()) < 0)
break;
latches[nlatches++] = io_worker_control->workers[worker].latch;
}