#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/shmem.h"
+#include "storage/subsystems.h"
#include "utils/guc.h"
+static void AioShmemRequest(void *arg);
+static void AioShmemInit(void *arg);
+static void AioShmemAttach(void *arg);
-static Size
-AioCtlShmemSize(void)
-{
- /* pgaio_ctl itself */
- return sizeof(PgAioCtl);
-}
+const ShmemCallbacks AioShmemCallbacks = {
+ .request_fn = AioShmemRequest,
+ .init_fn = AioShmemInit,
+ .attach_fn = AioShmemAttach,
+};
+
+static PgAioBackend *AioBackendShmemPtr;
+static PgAioHandle *AioHandleShmemPtr;
+static struct iovec *AioHandleIOVShmemPtr;
+static uint64 *AioHandleDataShmemPtr;
static uint32
AioProcs(void)
return Min(max_proportional_pins, 64);
}
-Size
-AioShmemSize(void)
+/*
+ * Register AIO subsystem's shared memory needs.
+ */
+static void
+AioShmemRequest(void *arg)
{
- Size sz = 0;
-
/*
+ * Resolve io_max_concurrency if not already done
+ *
* We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
* However, if the DBA explicitly set io_max_concurrency = -1 in the
* config file, then PGC_S_DYNAMIC_DEFAULT will fail to override that and
PGC_S_OVERRIDE);
}
- sz = add_size(sz, AioCtlShmemSize());
- sz = add_size(sz, AioBackendShmemSize());
- sz = add_size(sz, AioHandleShmemSize());
- sz = add_size(sz, AioHandleIOVShmemSize());
- sz = add_size(sz, AioHandleDataShmemSize());
-
- /* Reserve space for method specific resources. */
- if (pgaio_method_ops->shmem_size)
- sz = add_size(sz, pgaio_method_ops->shmem_size());
-
- return sz;
+ ShmemRequestStruct(.name = "AioCtl",
+ .size = sizeof(PgAioCtl),
+ .ptr = (void **) &pgaio_ctl,
+ );
+
+ ShmemRequestStruct(.name = "AioBackend",
+ .size = AioBackendShmemSize(),
+ .ptr = (void **) &AioBackendShmemPtr,
+ );
+
+ ShmemRequestStruct(.name = "AioHandle",
+ .size = AioHandleShmemSize(),
+ .ptr = (void **) &AioHandleShmemPtr,
+ );
+
+ ShmemRequestStruct(.name = "AioHandleIOV",
+ .size = AioHandleIOVShmemSize(),
+ .ptr = (void **) &AioHandleIOVShmemPtr,
+ );
+
+ ShmemRequestStruct(.name = "AioHandleData",
+ .size = AioHandleDataShmemSize(),
+ .ptr = (void **) &AioHandleDataShmemPtr,
+ );
+
+ if (pgaio_method_ops->shmem_callbacks.request_fn)
+ pgaio_method_ops->shmem_callbacks.request_fn(pgaio_method_ops->shmem_callbacks.opaque_arg);
}
-void
-AioShmemInit(void)
+/*
+ * Initialize AIO shared memory during postmaster startup.
+ */
+static void
+AioShmemInit(void *arg)
{
- bool found;
uint32 io_handle_off = 0;
uint32 iovec_off = 0;
uint32 per_backend_iovecs = io_max_concurrency * io_max_combine_limit;
- pgaio_ctl = (PgAioCtl *)
- ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found);
-
- if (found)
- goto out;
-
- memset(pgaio_ctl, 0, AioCtlShmemSize());
-
pgaio_ctl->io_handle_count = AioProcs() * io_max_concurrency;
pgaio_ctl->iovec_count = AioProcs() * per_backend_iovecs;
- pgaio_ctl->backend_state = (PgAioBackend *)
- ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found);
-
- pgaio_ctl->io_handles = (PgAioHandle *)
- ShmemInitStruct("AioHandle", AioHandleShmemSize(), &found);
-
- pgaio_ctl->iovecs = (struct iovec *)
- ShmemInitStruct("AioHandleIOV", AioHandleIOVShmemSize(), &found);
- pgaio_ctl->handle_data = (uint64 *)
- ShmemInitStruct("AioHandleData", AioHandleDataShmemSize(), &found);
+ pgaio_ctl->backend_state = AioBackendShmemPtr;
+ pgaio_ctl->io_handles = AioHandleShmemPtr;
+ pgaio_ctl->iovecs = AioHandleIOVShmemPtr;
+ pgaio_ctl->handle_data = AioHandleDataShmemPtr;
for (int procno = 0; procno < AioProcs(); procno++)
{
}
}
-out:
- /* Initialize IO method specific resources. */
- if (pgaio_method_ops->shmem_init)
- pgaio_method_ops->shmem_init(!found);
+ if (pgaio_method_ops->shmem_callbacks.init_fn)
+ pgaio_method_ops->shmem_callbacks.init_fn(pgaio_method_ops->shmem_callbacks.opaque_arg);
+}
+
+static void
+AioShmemAttach(void *arg)
+{
+ if (pgaio_method_ops->shmem_callbacks.attach_fn)
+ pgaio_method_ops->shmem_callbacks.attach_fn(pgaio_method_ops->shmem_callbacks.opaque_arg);
}
void
/* Entry points for IoMethodOps. */
-static size_t pgaio_uring_shmem_size(void);
-static void pgaio_uring_shmem_init(bool first_time);
+static void pgaio_uring_shmem_request(void *arg);
+static void pgaio_uring_shmem_init(void *arg);
static void pgaio_uring_init_backend(void);
static int pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
static void pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation);
*/
.wait_on_fd_before_close = true,
- .shmem_size = pgaio_uring_shmem_size,
- .shmem_init = pgaio_uring_shmem_init,
+ .shmem_callbacks.request_fn = pgaio_uring_shmem_request,
+ .shmem_callbacks.init_fn = pgaio_uring_shmem_init,
.init_backend = pgaio_uring_init_backend,
.submit = pgaio_uring_submit,
{
size_t sz;
+ sz = pgaio_uring_context_shmem_size();
+ sz = add_size(sz, pgaio_uring_ring_shmem_size());
+
+ return sz;
+}
+
+static void
+pgaio_uring_shmem_request(void *arg)
+{
/*
* Kernel and liburing support for various features influences how much
* shmem we need, perform the necessary checks.
*/
pgaio_uring_check_capabilities();
- sz = pgaio_uring_context_shmem_size();
- sz = add_size(sz, pgaio_uring_ring_shmem_size());
-
- return sz;
+ ShmemRequestStruct(.name = "AioUringContext",
+ .size = pgaio_uring_shmem_size(),
+ .ptr = (void **) &pgaio_uring_contexts,
+ );
}
static void
-pgaio_uring_shmem_init(bool first_time)
+pgaio_uring_shmem_init(void *arg)
{
int TotalProcs = pgaio_uring_procs();
- bool found;
char *shmem;
size_t ring_mem_remain = 0;
char *ring_mem_next = 0;
/*
* We allocate memory for all PgAioUringContext instances and, if
* supported, the memory required for each of the io_uring instances, in
- * one ShmemInitStruct().
+ * one combined allocation.
+ *
+ * pgaio_uring_contexts is already set to the base of the allocation.
*/
- shmem = ShmemInitStruct("AioUringContext", pgaio_uring_shmem_size(), &found);
- if (found)
- return;
-
- pgaio_uring_contexts = (PgAioUringContext *) shmem;
+ shmem = (char *) pgaio_uring_contexts;
shmem += pgaio_uring_context_shmem_size();
/* if supported, handle memory alignment / sizing for io_uring memory */
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/proc.h"
+#include "storage/shmem.h"
#include "tcop/tcopprot.h"
#include "utils/injection_point.h"
#include "utils/memdebug.h"
} PgAioWorkerControl;
-static size_t pgaio_worker_shmem_size(void);
-static void pgaio_worker_shmem_init(bool first_time);
+static void pgaio_worker_shmem_request(void *arg);
+static void pgaio_worker_shmem_init(void *arg);
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
const IoMethodOps pgaio_worker_ops = {
- .shmem_size = pgaio_worker_shmem_size,
- .shmem_init = pgaio_worker_shmem_init,
+ .shmem_callbacks.request_fn = pgaio_worker_shmem_request,
+ .shmem_callbacks.init_fn = pgaio_worker_shmem_init,
.needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
.submit = pgaio_worker_submit,
static PgAioWorkerControl *io_worker_control;
-static size_t
-pgaio_worker_queue_shmem_size(int *queue_size)
-{
- /* Round size up to next power of two so we can make a mask. */
- *queue_size = pg_nextpower2_32(io_worker_queue_size);
-
- return offsetof(PgAioWorkerSubmissionQueue, sqes) +
- sizeof(int) * *queue_size;
-}
-
-static size_t
-pgaio_worker_control_shmem_size(void)
-{
- return offsetof(PgAioWorkerControl, workers) +
- sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
-}
-
-static size_t
-pgaio_worker_shmem_size(void)
+static void
+pgaio_worker_shmem_request(void *arg)
{
- size_t sz;
+ size_t size;
int queue_size;
- sz = pgaio_worker_queue_shmem_size(&queue_size);
- sz = add_size(sz, pgaio_worker_control_shmem_size());
-
- return sz;
+ /* Round size up to next power of two so we can make a mask. */
+ queue_size = pg_nextpower2_32(io_worker_queue_size);
+
+ size = offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(int) * queue_size;
+ ShmemRequestStruct(.name = "AioWorkerSubmissionQueue",
+ .size = size,
+ .ptr = (void **) &io_worker_submission_queue,
+ );
+
+ size = offsetof(PgAioWorkerControl, workers) + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
+ ShmemRequestStruct(.name = "AioWorkerControl",
+ .size = size,
+ .ptr = (void **) &io_worker_control,
+ );
}
static void
-pgaio_worker_shmem_init(bool first_time)
+pgaio_worker_shmem_init(void *arg)
{
- bool found;
int queue_size;
- io_worker_submission_queue =
- ShmemInitStruct("AioWorkerSubmissionQueue",
- pgaio_worker_queue_shmem_size(&queue_size),
- &found);
- if (!found)
- {
- io_worker_submission_queue->size = queue_size;
- io_worker_submission_queue->head = 0;
- io_worker_submission_queue->tail = 0;
- }
+ /* Round size up like in pgaio_worker_shmem_request() */
+ queue_size = pg_nextpower2_32(io_worker_queue_size);
- io_worker_control =
- ShmemInitStruct("AioWorkerControl",
- pgaio_worker_control_shmem_size(),
- &found);
- if (!found)
+ io_worker_submission_queue->size = queue_size;
+ io_worker_submission_queue->head = 0;
+ io_worker_submission_queue->tail = 0;
+
+ io_worker_control->idle_worker_mask = 0;
+ for (int i = 0; i < MAX_IO_WORKERS; ++i)
{
- io_worker_control->idle_worker_mask = 0;
- for (int i = 0; i < MAX_IO_WORKERS; ++i)
- {
- io_worker_control->workers[i].latch = NULL;
- io_worker_control->workers[i].in_use = false;
- }
+ io_worker_control->workers[i].latch = NULL;
+ io_worker_control->workers[i].in_use = false;
}
}
size = add_size(size, WaitEventCustomShmemSize());
size = add_size(size, InjectionPointShmemSize());
size = add_size(size, SlotSyncShmemSize());
- size = add_size(size, AioShmemSize());
size = add_size(size, WaitLSNShmemSize());
size = add_size(size, LogicalDecodingCtlShmemSize());
size = add_size(size, DataChecksumsShmemSize());
StatsShmemInit();
WaitEventCustomShmemInit();
InjectionPointShmemInit();
- AioShmemInit();
WaitLSNShmemInit();
LogicalDecodingCtlShmemInit();
}
#include "port/pg_iovec.h"
#include "storage/aio.h"
#include "storage/condition_variable.h"
+#include "storage/ipc.h"
+#include "storage/shmem.h"
/*
*/
bool wait_on_fd_before_close;
-
/* global initialization */
-
- /*
- * Amount of additional shared memory to reserve for the io_method. Called
- * just like a normal ipci.c style *Size() function. Optional.
- */
- size_t (*shmem_size) (void);
-
- /*
- * Initialize shared memory. First time is true if AIO's shared memory was
- * just initialized, false otherwise. Optional.
- */
- void (*shmem_init) (bool first_time);
+ ShmemCallbacks shmem_callbacks;
/*
* Per-backend initialization. Optional.
/* aio_init.c */
-extern Size AioShmemSize(void);
-extern void AioShmemInit(void);
-
extern void pgaio_init_backend(void);
/* other modules that need some shared memory space */
PG_SHMEM_SUBSYSTEM(AsyncShmemCallbacks)
+
+/* AIO subsystem. This delegates to the method-specific callbacks */
+PG_SHMEM_SUBSYSTEM(AioShmemCallbacks)