]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Use atomics to avoid locking in InjectionPointRun()
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 15 Jul 2024 07:21:16 +0000 (10:21 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Mon, 15 Jul 2024 07:33:09 +0000 (10:33 +0300)
This allows using injection points without having a PGPROC, like early
at backend startup, or in the postmaster.

The injection points facility is new in v17, so backpatch there.

Reviewed-by: Michael Paquier <michael@paquier.xyz>
Disussion: https://www.postgresql.org/message-id/4317a7f7-8d24-435e-9e49-29b72a3dc418@iki.fi

src/backend/utils/misc/injection_point.c
src/tools/pgindent/typedefs.list

index afae0dbedf416ce9f09f37f5925ff738245274d4..3c63a8ace861bc94f21f3cf42e1c0fb2cf3293bf 100644 (file)
@@ -21,7 +21,6 @@
 
 #include "fmgr.h"
 #include "miscadmin.h"
-#include "port/pg_bitutils.h"
 #include "storage/fd.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 
 #ifdef USE_INJECTION_POINTS
 
-/*
- * Hash table for storing injection points.
- *
- * InjectionPointHash is used to find an injection point by name.
- */
-static HTAB *InjectionPointHash;       /* find points from names */
-
 /* Field sizes */
 #define INJ_NAME_MAXLEN                64
 #define INJ_LIB_MAXLEN         128
 #define INJ_FUNC_MAXLEN                128
 #define INJ_PRIVATE_MAXLEN     1024
 
-/* Single injection point stored in InjectionPointHash */
+/* Single injection point stored in shared memory */
 typedef struct InjectionPointEntry
 {
+       /*
+        * Because injection points need to be usable without LWLocks, we use a
+        * generation counter on each entry to allow safe, lock-free reading.
+        *
+        * To read an entry, first read the current 'generation' value.  If it's
+        * even, then the slot is currently unused, and odd means it's in use.
+        * When reading the other fields, beware that they may change while
+        * reading them, if the entry is released and reused!  After reading the
+        * other fields, read 'generation' again: if its value hasn't changed, you
+        * can be certain that the other fields you read are valid.  Otherwise,
+        * the slot was concurrently recycled, and you should ignore it.
+        *
+        * When adding an entry, you must store all the other fields first, and
+        * then update the generation number, with an appropriate memory barrier
+        * in between. In addition to that protocol, you must also hold
+        * InjectionPointLock, to prevent two backends from modifying the array at
+        * the same time.
+        */
+       pg_atomic_uint64 generation;
+
        char            name[INJ_NAME_MAXLEN];  /* hash key */
        char            library[INJ_LIB_MAXLEN];        /* library */
        char            function[INJ_FUNC_MAXLEN];      /* function */
@@ -58,8 +70,22 @@ typedef struct InjectionPointEntry
        char            private_data[INJ_PRIVATE_MAXLEN];
 } InjectionPointEntry;
 
-#define INJECTION_POINT_HASH_INIT_SIZE 16
-#define INJECTION_POINT_HASH_MAX_SIZE  128
+#define MAX_INJECTION_POINTS   128
+
+/*
+ * Shared memory array of active injection points.
+ *
+ * 'max_inuse' is the highest index currently in use, plus one.  It's just an
+ * optimization to avoid scanning through the whole entry, in the common case
+ * that there are no injection points, or only a few.
+ */
+typedef struct InjectionPointsCtl
+{
+       pg_atomic_uint32 max_inuse;
+       InjectionPointEntry entries[MAX_INJECTION_POINTS];
+} InjectionPointsCtl;
+
+static InjectionPointsCtl *ActiveInjectionPoints;
 
 /*
  * Backend local cache of injection callbacks already loaded, stored in
@@ -70,6 +96,14 @@ typedef struct InjectionPointCacheEntry
        char            name[INJ_NAME_MAXLEN];
        char            private_data[INJ_PRIVATE_MAXLEN];
        InjectionPointCallback callback;
+
+       /*
+        * Shmem slot and copy of its generation number when this cache entry was
+        * created.  They can be used to validate if the cached entry is still
+        * valid.
+        */
+       int                     slot_idx;
+       uint64          generation;
 } InjectionPointCacheEntry;
 
 static HTAB *InjectionPointCache = NULL;
@@ -79,8 +113,10 @@ static HTAB *InjectionPointCache = NULL;
  *
  * Add an injection point to the local cache.
  */
-static void
+static InjectionPointCacheEntry *
 injection_point_cache_add(const char *name,
+                                                 int slot_idx,
+                                                 uint64 generation,
                                                  InjectionPointCallback callback,
                                                  const void *private_data)
 {
@@ -97,7 +133,7 @@ injection_point_cache_add(const char *name,
                hash_ctl.hcxt = TopMemoryContext;
 
                InjectionPointCache = hash_create("InjectionPoint cache hash",
-                                                                                 INJECTION_POINT_HASH_MAX_SIZE,
+                                                                                 MAX_INJECTION_POINTS,
                                                                                  &hash_ctl,
                                                                                  HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
        }
@@ -107,9 +143,12 @@ injection_point_cache_add(const char *name,
 
        Assert(!found);
        strlcpy(entry->name, name, sizeof(entry->name));
+       entry->slot_idx = slot_idx;
+       entry->generation = generation;
        entry->callback = callback;
-       if (private_data != NULL)
-               memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+       memcpy(entry->private_data, private_data, INJ_PRIVATE_MAXLEN);
+
+       return entry;
 }
 
 /*
@@ -122,11 +161,43 @@ injection_point_cache_add(const char *name,
 static void
 injection_point_cache_remove(const char *name)
 {
-       /* leave if no cache */
-       if (InjectionPointCache == NULL)
-               return;
+       bool            found PG_USED_FOR_ASSERTS_ONLY;
+
+       (void) hash_search(InjectionPointCache, name, HASH_REMOVE, &found);
+       Assert(found);
+}
+
+/*
+ * injection_point_cache_load
+ *
+ * Load an injection point into the local cache.
+ */
+static InjectionPointCacheEntry *
+injection_point_cache_load(InjectionPointEntry *entry, int slot_idx, uint64 generation)
+{
+       char            path[MAXPGPATH];
+       void       *injection_callback_local;
 
-       (void) hash_search(InjectionPointCache, name, HASH_REMOVE, NULL);
+       snprintf(path, MAXPGPATH, "%s/%s%s", pkglib_path,
+                        entry->library, DLSUFFIX);
+
+       if (!pg_file_exists(path))
+               elog(ERROR, "could not find library \"%s\" for injection point \"%s\"",
+                        path, entry->name);
+
+       injection_callback_local = (void *)
+               load_external_function(path, entry->function, false, NULL);
+
+       if (injection_callback_local == NULL)
+               elog(ERROR, "could not find function \"%s\" in library \"%s\" for injection point \"%s\"",
+                        entry->function, path, entry->name);
+
+       /* add it to the local cache */
+       return injection_point_cache_add(entry->name,
+                                                                        slot_idx,
+                                                                        generation,
+                                                                        injection_callback_local,
+                                                                        entry->private_data);
 }
 
 /*
@@ -134,15 +205,12 @@ injection_point_cache_remove(const char *name)
  *
  * Retrieve an injection point from the local cache, if any.
  */
-static InjectionPointCallback
-injection_point_cache_get(const char *name, const void **private_data)
+static InjectionPointCacheEntry *
+injection_point_cache_get(const char *name)
 {
        bool            found;
        InjectionPointCacheEntry *entry;
 
-       if (private_data)
-               *private_data = NULL;
-
        /* no callback if no cache yet */
        if (InjectionPointCache == NULL)
                return NULL;
@@ -151,11 +219,7 @@ injection_point_cache_get(const char *name, const void **private_data)
                hash_search(InjectionPointCache, name, HASH_FIND, &found);
 
        if (found)
-       {
-               if (private_data)
-                       *private_data = entry->private_data;
-               return entry->callback;
-       }
+               return entry;
 
        return NULL;
 }
@@ -170,8 +234,7 @@ InjectionPointShmemSize(void)
 #ifdef USE_INJECTION_POINTS
        Size            sz = 0;
 
-       sz = add_size(sz, hash_estimate_size(INJECTION_POINT_HASH_MAX_SIZE,
-                                                                                sizeof(InjectionPointEntry)));
+       sz = add_size(sz, sizeof(InjectionPointsCtl));
        return sz;
 #else
        return 0;
@@ -185,16 +248,20 @@ void
 InjectionPointShmemInit(void)
 {
 #ifdef USE_INJECTION_POINTS
-       HASHCTL         info;
-
-       /* key is a NULL-terminated string */
-       info.keysize = sizeof(char[INJ_NAME_MAXLEN]);
-       info.entrysize = sizeof(InjectionPointEntry);
-       InjectionPointHash = ShmemInitHash("InjectionPoint hash",
-                                                                          INJECTION_POINT_HASH_INIT_SIZE,
-                                                                          INJECTION_POINT_HASH_MAX_SIZE,
-                                                                          &info,
-                                                                          HASH_ELEM | HASH_FIXED_SIZE | HASH_STRINGS);
+       bool            found;
+
+       ActiveInjectionPoints = ShmemInitStruct("InjectionPoint hash",
+                                                                                       sizeof(InjectionPointsCtl),
+                                                                                       &found);
+       if (!IsUnderPostmaster)
+       {
+               Assert(!found);
+               pg_atomic_init_u32(&ActiveInjectionPoints->max_inuse, 0);
+               for (int i = 0; i < MAX_INJECTION_POINTS; i++)
+                       pg_atomic_init_u64(&ActiveInjectionPoints->entries[i].generation, 0);
+       }
+       else
+               Assert(found);
 #endif
 }
 
@@ -209,8 +276,10 @@ InjectionPointAttach(const char *name,
                                         int private_data_size)
 {
 #ifdef USE_INJECTION_POINTS
-       InjectionPointEntry *entry_by_name;
-       bool            found;
+       InjectionPointEntry *entry;
+       uint64          generation;
+       uint32          max_inuse;
+       int                     free_idx;
 
        if (strlen(name) >= INJ_NAME_MAXLEN)
                elog(ERROR, "injection point name %s too long (maximum of %u)",
@@ -230,21 +299,51 @@ InjectionPointAttach(const char *name,
         * exist.  For testing purposes this should be fine.
         */
        LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
-       entry_by_name = (InjectionPointEntry *)
-               hash_search(InjectionPointHash, name,
-                                       HASH_ENTER, &found);
-       if (found)
-               elog(ERROR, "injection point \"%s\" already defined", name);
+       max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+       free_idx = -1;
+
+       for (int idx = 0; idx < max_inuse; idx++)
+       {
+               entry = &ActiveInjectionPoints->entries[idx];
+               generation = pg_atomic_read_u64(&entry->generation);
+               if (generation % 2 == 0)
+               {
+                       /*
+                        * Found a free slot where we can add the new entry, but keep
+                        * going so that we will find out if the entry already exists.
+                        */
+                       if (free_idx == -1)
+                               free_idx = idx;
+               }
+
+               if (strcmp(entry->name, name) == 0)
+                       elog(ERROR, "injection point \"%s\" already defined", name);
+       }
+       if (free_idx == -1)
+       {
+               if (max_inuse == MAX_INJECTION_POINTS)
+                       elog(ERROR, "too many injection points");
+               free_idx = max_inuse;
+       }
+       entry = &ActiveInjectionPoints->entries[free_idx];
+       generation = pg_atomic_read_u64(&entry->generation);
+       Assert(generation % 2 == 0);
 
        /* Save the entry */
-       strlcpy(entry_by_name->name, name, sizeof(entry_by_name->name));
-       entry_by_name->name[INJ_NAME_MAXLEN - 1] = '\0';
-       strlcpy(entry_by_name->library, library, sizeof(entry_by_name->library));
-       entry_by_name->library[INJ_LIB_MAXLEN - 1] = '\0';
-       strlcpy(entry_by_name->function, function, sizeof(entry_by_name->function));
-       entry_by_name->function[INJ_FUNC_MAXLEN - 1] = '\0';
+       strlcpy(entry->name, name, sizeof(entry->name));
+       entry->name[INJ_NAME_MAXLEN - 1] = '\0';
+       strlcpy(entry->library, library, sizeof(entry->library));
+       entry->library[INJ_LIB_MAXLEN - 1] = '\0';
+       strlcpy(entry->function, function, sizeof(entry->function));
+       entry->function[INJ_FUNC_MAXLEN - 1] = '\0';
        if (private_data != NULL)
-               memcpy(entry_by_name->private_data, private_data, private_data_size);
+               memcpy(entry->private_data, private_data, private_data_size);
+
+       pg_write_barrier();
+       pg_atomic_write_u64(&entry->generation, generation + 1);
+
+       if (free_idx + 1 > max_inuse)
+               pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, free_idx + 1);
 
        LWLockRelease(InjectionPointLock);
 
@@ -262,88 +361,177 @@ bool
 InjectionPointDetach(const char *name)
 {
 #ifdef USE_INJECTION_POINTS
-       bool            found;
+       bool            found = false;
+       int                     idx;
+       int                     max_inuse;
 
        LWLockAcquire(InjectionPointLock, LW_EXCLUSIVE);
-       hash_search(InjectionPointHash, name, HASH_REMOVE, &found);
-       LWLockRelease(InjectionPointLock);
 
-       if (!found)
-               return false;
+       /* Find it in the shmem array, and mark the slot as unused */
+       max_inuse = (int) pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+       for (idx = max_inuse - 1; idx >= 0; --idx)
+       {
+               InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+               uint64          generation;
+
+               generation = pg_atomic_read_u64(&entry->generation);
+               if (generation % 2 == 0)
+                       continue;                       /* empty slot */
+
+               if (strcmp(entry->name, name) == 0)
+               {
+                       Assert(!found);
+                       found = true;
+                       pg_atomic_write_u64(&entry->generation, generation + 1);
+                       break;
+               }
+       }
 
-       return true;
+       /* If we just removed the highest-numbered entry, update 'max_inuse' */
+       if (found && idx == max_inuse - 1)
+       {
+               for (; idx >= 0; --idx)
+               {
+                       InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+                       uint64          generation;
+
+                       generation = pg_atomic_read_u64(&entry->generation);
+                       if (generation % 2 != 0)
+                               break;
+               }
+               pg_atomic_write_u32(&ActiveInjectionPoints->max_inuse, idx + 1);
+       }
+       LWLockRelease(InjectionPointLock);
+
+       return found;
 #else
        elog(ERROR, "Injection points are not supported by this build");
        return true;                            /* silence compiler */
 #endif
 }
 
+#ifdef USE_INJECTION_POINTS
 /*
- * Execute an injection point, if defined.
+ * Common workhorse of InjectionPointRun() and InjectionPointLoad()
  *
- * Check first the shared hash table, and adapt the local cache depending
- * on that as it could be possible that an entry to run has been removed.
+ * Checks if an injection point exists in shared memory, and update
+ * the local cache entry accordingly.
  */
-void
-InjectionPointRun(const char *name)
+static InjectionPointCacheEntry *
+InjectionPointCacheRefresh(const char *name)
 {
-#ifdef USE_INJECTION_POINTS
-       InjectionPointEntry *entry_by_name;
-       bool            found;
-       InjectionPointCallback injection_callback;
-       const void *private_data;
+       uint32          max_inuse;
+       int                     namelen;
+       InjectionPointEntry local_copy;
+       InjectionPointCacheEntry *cached;
 
-       LWLockAcquire(InjectionPointLock, LW_SHARED);
-       entry_by_name = (InjectionPointEntry *)
-               hash_search(InjectionPointHash, name,
-                                       HASH_FIND, &found);
+       /*
+        * First read the number of in-use slots.  More entries can be added or
+        * existing ones can be removed while we're reading them.  If the entry
+        * we're looking for is concurrently added or removed, we might or might
+        * not see it.  That's OK.
+        */
+       max_inuse = pg_atomic_read_u32(&ActiveInjectionPoints->max_inuse);
+       if (max_inuse == 0)
+       {
+               if (InjectionPointCache)
+               {
+                       hash_destroy(InjectionPointCache);
+                       InjectionPointCache = NULL;
+               }
+               return NULL;
+       }
 
        /*
-        * If not found, do nothing and remove it from the local cache if it
-        * existed there.
+        * If we have this entry in the local cache already, check if the cached
+        * entry is still valid.
         */
-       if (!found)
+       cached = injection_point_cache_get(name);
+       if (cached)
        {
+               int                     idx = cached->slot_idx;
+               InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+
+               if (pg_atomic_read_u64(&entry->generation) == cached->generation)
+               {
+                       /* still good */
+                       return cached;
+               }
                injection_point_cache_remove(name);
-               LWLockRelease(InjectionPointLock);
-               return;
+               cached = NULL;
        }
 
        /*
-        * Check if the callback exists in the local cache, to avoid unnecessary
-        * external loads.
+        * Search the shared memory array.
+        *
+        * It's possible that the entry we're looking for is concurrently detached
+        * or attached.  Or detached *and* re-attached, to the same slot or a
+        * different slot.  Detach and re-attach is not an atomic operation, so
+        * it's OK for us to return the old value, NULL, or the new value in such
+        * cases.
         */
-       if (injection_point_cache_get(name, NULL) == NULL)
+       namelen = strlen(name);
+       for (int idx = 0; idx < max_inuse; idx++)
        {
-               char            path[MAXPGPATH];
-               InjectionPointCallback injection_callback_local;
-
-               /* not found in local cache, so load and register */
-               snprintf(path, MAXPGPATH, "%s/%s%s", pkglib_path,
-                                entry_by_name->library, DLSUFFIX);
-
-               if (!pg_file_exists(path))
-                       elog(ERROR, "could not find library \"%s\" for injection point \"%s\"",
-                                path, name);
-
-               injection_callback_local = (InjectionPointCallback)
-                       load_external_function(path, entry_by_name->function, false, NULL);
-
-               if (injection_callback_local == NULL)
-                       elog(ERROR, "could not find function \"%s\" in library \"%s\" for injection point \"%s\"",
-                                entry_by_name->function, path, name);
-
-               /* add it to the local cache when found */
-               injection_point_cache_add(name, injection_callback_local,
-                                                                 entry_by_name->private_data);
+               InjectionPointEntry *entry = &ActiveInjectionPoints->entries[idx];
+               uint64          generation;
+
+               /*
+                * Read the generation number so that we can detect concurrent
+                * modifications.  The read barrier ensures that the generation number
+                * is loaded before any of the other fields.
+                */
+               generation = pg_atomic_read_u64(&entry->generation);
+               if (generation % 2 == 0)
+                       continue;                       /* empty slot */
+               pg_read_barrier();
+
+               /* Is this the injection point we're looking for? */
+               if (memcmp(entry->name, name, namelen + 1) != 0)
+                       continue;
+
+               /*
+                * The entry can change at any time, if the injection point is
+                * concurrently detached.  Copy it to local memory, and re-check the
+                * generation.  If the generation hasn't changed, we know our local
+                * copy is coherent.
+                */
+               memcpy(&local_copy, entry, sizeof(InjectionPointEntry));
+
+               pg_read_barrier();
+               if (pg_atomic_read_u64(&entry->generation) != generation)
+               {
+                       /*
+                        * The entry was concurrently detached.
+                        *
+                        * Continue the search, because if the generation number changed,
+                        * we cannot trust the result of the name comparison we did above.
+                        * It's theoretically possible that it falsely matched a mixed-up
+                        * state of the old and new name, if the slot was recycled with a
+                        * different name.
+                        */
+                       continue;
+               }
+
+               /* Success! Load it into the cache and return it */
+               return injection_point_cache_load(&local_copy, idx, generation);
        }
+       return NULL;
+}
+#endif
 
-       /* Now loaded, so get it. */
-       injection_callback = injection_point_cache_get(name, &private_data);
-
-       LWLockRelease(InjectionPointLock);
+/*
+ * Execute an injection point, if defined.
+ */
+void
+InjectionPointRun(const char *name)
+{
+#ifdef USE_INJECTION_POINTS
+       InjectionPointCacheEntry *cache_entry;
 
-       injection_callback(name, private_data);
+       cache_entry = InjectionPointCacheRefresh(name);
+       if (cache_entry)
+               cache_entry->callback(name, cache_entry->private_data);
 #else
        elog(ERROR, "Injection points are not supported by this build");
 #endif
index d1c97740d7854f4e07c2c2dbb30058aca3312e0e..a84c45ce10cdc3c0b87588d3f9b5cdeb4957cbf8 100644 (file)
@@ -1237,6 +1237,7 @@ InjectionPointCallback
 InjectionPointCondition
 InjectionPointConditionType
 InjectionPointEntry
+InjectionPointsCtl
 InjectionPointSharedState
 InlineCodeBlock
 InsertStmt