]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-115103: Implement delayed free mechanism for free-threaded builds (#115367)
authorSam Gross <colesbury@gmail.com>
Tue, 20 Feb 2024 18:04:37 +0000 (13:04 -0500)
committerGitHub <noreply@github.com>
Tue, 20 Feb 2024 18:04:37 +0000 (13:04 -0500)
This adds `_PyMem_FreeDelayed()` and supporting functions. The
`_PyMem_FreeDelayed()` function frees memory with the same allocator as
`PyMem_Free()`, but after some delay to ensure that concurrent lock-free
readers have finished.

Include/internal/pycore_interp.h
Include/internal/pycore_pymem.h
Include/internal/pycore_pymem_init.h
Include/internal/pycore_runtime_init.h
Include/internal/pycore_tstate.h
Objects/obmalloc.c
Python/pylifecycle.c
Python/pystate.c

index 06eba665c80e930ed4dd13450e2eb4cf9ca253cf..6a00aafea73779a004048b7565e11c550e1dec19 100644 (file)
@@ -231,6 +231,7 @@ struct _is {
 
     struct _Py_dict_state dict_state;
     struct _Py_exc_state exc_state;
+    struct _Py_mem_interp_free_queue mem_free_queue;
 
     struct ast_state ast;
     struct types_state types;
index 1a72d07b50b738e2ee863fb7421e52f68b9da804..1aea91abc5d69f42ee4c5a7b5c34bd672414e267 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef Py_INTERNAL_PYMEM_H
 #define Py_INTERNAL_PYMEM_H
 
+#include "pycore_llist.h"           // struct llist_node
 #include "pycore_lock.h"            // PyMutex
 
 #ifdef __cplusplus
@@ -48,6 +49,11 @@ struct _pymem_allocators {
     PyObjectArenaAllocator obj_arena;
 };
 
+struct _Py_mem_interp_free_queue {
+    int has_work;   // true if the queue is not empty
+    PyMutex mutex;  // protects the queue
+    struct llist_node head;  // queue of _mem_work_chunk items
+};
 
 /* Set the memory allocator of the specified domain to the default.
    Save the old allocator into *old_alloc if it's non-NULL.
@@ -110,6 +116,19 @@ extern int _PyMem_SetupAllocators(PyMemAllocatorName allocator);
 /* Is the debug allocator enabled? */
 extern int _PyMem_DebugEnabled(void);
 
+// Enqueue a pointer to be freed possibly after some delay.
+extern void _PyMem_FreeDelayed(void *ptr);
+
+// Periodically process delayed free requests.
+extern void _PyMem_ProcessDelayed(PyThreadState *tstate);
+
+// Abandon all thread-local delayed free requests and push them to the
+// interpreter's queue.
+extern void _PyMem_AbandonDelayed(PyThreadState *tstate);
+
+// On interpreter shutdown, frees all delayed free requests.
+extern void _PyMem_FiniDelayed(PyInterpreterState *interp);
+
 #ifdef __cplusplus
 }
 #endif
index 96c49ed7338d6d6a34d2808ef0f4841038346c1f..c593edc86d9952aa9b001006122460ee6f215c44 100644 (file)
@@ -92,6 +92,11 @@ extern void _PyMem_ArenaFree(void *, void *, size_t);
     { NULL, _PyMem_ArenaAlloc, _PyMem_ArenaFree }
 
 
+#define _Py_mem_free_queue_INIT(queue) \
+    { \
+        .head = LLIST_INIT(queue.head), \
+    }
+
 #ifdef __cplusplus
 }
 #endif
index be81604d653814c56917ad9cfa58d38ce1d919f0..d093047d4bc09da2d1a4d62c5f2f5062c4bef756 100644 (file)
@@ -176,6 +176,7 @@ extern PyTypeObject _PyExc_MemoryError;
         }, \
         .dtoa = _dtoa_state_INIT(&(INTERP)), \
         .dict_state = _dict_state_INIT, \
+        .mem_free_queue = _Py_mem_free_queue_INIT(INTERP.mem_free_queue), \
         .func_state = { \
             .next_version = 1, \
         }, \
index d0f980ed49ee3e236779d1a608a8c786d886bd79..e268e6fbbb087b24f98926df1bd646e84d40645b 100644 (file)
@@ -29,6 +29,7 @@ typedef struct _PyThreadStateImpl {
     PyThreadState base;
 
     struct _qsbr_thread_state *qsbr;  // only used by free-threaded build
+    struct llist_node mem_free_queue; // delayed free queue
 
 #ifdef Py_GIL_DISABLED
     struct _gc_thread_state gc;
index 6a12c3dca38b36de6580ef10c2d249030a5086c2..9bf4eeb9c822a50d495611ce9a859ffcee60755e 100644 (file)
@@ -948,6 +948,196 @@ _PyMem_Strdup(const char *str)
     return copy;
 }
 
+/***********************************************/
+/* Delayed freeing support for Py_GIL_DISABLED */
+/***********************************************/
+
+// So that sizeof(struct _mem_work_chunk) is 4096 bytes on 64-bit platforms.
+#define WORK_ITEMS_PER_CHUNK 254
+
+// A pointer to be freed once the QSBR read sequence reaches qsbr_goal.
+struct _mem_work_item {
+    void *ptr;
+    uint64_t qsbr_goal;
+};
+
+// A fixed-size buffer of pointers to be freed
+struct _mem_work_chunk {
+    // Linked list node of chunks in queue
+    struct llist_node node;
+
+    Py_ssize_t rd_idx;  // index of next item to read
+    Py_ssize_t wr_idx;  // index of next item to write
+    struct _mem_work_item array[WORK_ITEMS_PER_CHUNK];
+};
+
+void
+_PyMem_FreeDelayed(void *ptr)
+{
+#ifndef Py_GIL_DISABLED
+    PyMem_Free(ptr);
+#else
+    if (_PyRuntime.stoptheworld.world_stopped) {
+        // Free immediately if the world is stopped, including during
+        // interpreter shutdown.
+        PyMem_Free(ptr);
+        return;
+    }
+
+    _PyThreadStateImpl *tstate = (_PyThreadStateImpl *)_PyThreadState_GET();
+    struct llist_node *head = &tstate->mem_free_queue;
+
+    struct _mem_work_chunk *buf = NULL;
+    if (!llist_empty(head)) {
+        // Try to re-use the last buffer
+        buf = llist_data(head->prev, struct _mem_work_chunk, node);
+        if (buf->wr_idx == WORK_ITEMS_PER_CHUNK) {
+            // already full
+            buf = NULL;
+        }
+    }
+
+    if (buf == NULL) {
+        buf = PyMem_Calloc(1, sizeof(*buf));
+        if (buf != NULL) {
+            llist_insert_tail(head, &buf->node);
+        }
+    }
+
+    if (buf == NULL) {
+        // failed to allocate a buffer, free immediately
+        _PyEval_StopTheWorld(tstate->base.interp);
+        PyMem_Free(ptr);
+        _PyEval_StartTheWorld(tstate->base.interp);
+        return;
+    }
+
+    assert(buf != NULL && buf->wr_idx < WORK_ITEMS_PER_CHUNK);
+    uint64_t seq = _Py_qsbr_deferred_advance(tstate->qsbr);
+    buf->array[buf->wr_idx].ptr = ptr;
+    buf->array[buf->wr_idx].qsbr_goal = seq;
+    buf->wr_idx++;
+
+    if (buf->wr_idx == WORK_ITEMS_PER_CHUNK) {
+        _PyMem_ProcessDelayed((PyThreadState *)tstate);
+    }
+#endif
+}
+
+static struct _mem_work_chunk *
+work_queue_first(struct llist_node *head)
+{
+    return llist_data(head->next, struct _mem_work_chunk, node);
+}
+
+static void
+process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr,
+              bool keep_empty)
+{
+    while (!llist_empty(head)) {
+        struct _mem_work_chunk *buf = work_queue_first(head);
+
+        while (buf->rd_idx < buf->wr_idx) {
+            struct _mem_work_item *item = &buf->array[buf->rd_idx];
+            if (!_Py_qsbr_poll(qsbr, item->qsbr_goal)) {
+                return;
+            }
+
+            PyMem_Free(item->ptr);
+            buf->rd_idx++;
+        }
+
+        assert(buf->rd_idx == buf->wr_idx);
+        if (keep_empty && buf->node.next == head) {
+            // Keep the last buffer in the queue to reduce re-allocations
+            buf->rd_idx = buf->wr_idx = 0;
+            return;
+        }
+
+        llist_remove(&buf->node);
+        PyMem_Free(buf);
+    }
+}
+
+static void
+process_interp_queue(struct _Py_mem_interp_free_queue *queue,
+                     struct _qsbr_thread_state *qsbr)
+{
+    if (!_Py_atomic_load_int_relaxed(&queue->has_work)) {
+        return;
+    }
+
+    // Try to acquire the lock, but don't block if it's already held.
+    if (_PyMutex_LockTimed(&queue->mutex, 0, 0) == PY_LOCK_ACQUIRED) {
+        process_queue(&queue->head, qsbr, false);
+
+        int more_work = !llist_empty(&queue->head);
+        _Py_atomic_store_int_relaxed(&queue->has_work, more_work);
+
+        PyMutex_Unlock(&queue->mutex);
+    }
+}
+
+void
+_PyMem_ProcessDelayed(PyThreadState *tstate)
+{
+    PyInterpreterState *interp = tstate->interp;
+    _PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)tstate;
+
+    // Process thread-local work
+    process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true);
+
+    // Process shared interpreter work
+    process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr);
+}
+
+void
+_PyMem_AbandonDelayed(PyThreadState *tstate)
+{
+    PyInterpreterState *interp = tstate->interp;
+    struct llist_node *queue = &((_PyThreadStateImpl *)tstate)->mem_free_queue;
+
+    if (llist_empty(queue)) {
+        return;
+    }
+
+    // Check if the queue contains one empty buffer
+    struct _mem_work_chunk *buf = work_queue_first(queue);
+    if (buf->rd_idx == buf->wr_idx) {
+        llist_remove(&buf->node);
+        PyMem_Free(buf);
+        assert(llist_empty(queue));
+        return;
+    }
+
+    // Merge the thread's work queue into the interpreter's work queue.
+    PyMutex_Lock(&interp->mem_free_queue.mutex);
+    llist_concat(&interp->mem_free_queue.head, queue);
+    _Py_atomic_store_int_relaxed(&interp->mem_free_queue.has_work, 1);
+    PyMutex_Unlock(&interp->mem_free_queue.mutex);
+
+    assert(llist_empty(queue));  // the thread's queue is now empty
+}
+
+void
+_PyMem_FiniDelayed(PyInterpreterState *interp)
+{
+    struct llist_node *head = &interp->mem_free_queue.head;
+    while (!llist_empty(head)) {
+        struct _mem_work_chunk *buf = work_queue_first(head);
+
+        while (buf->rd_idx < buf->wr_idx) {
+            // Free the remaining items immediately. There should be no other
+            // threads accessing the memory at this point during shutdown.
+            struct _mem_work_item *item = &buf->array[buf->rd_idx];
+            PyMem_Free(item->ptr);
+            buf->rd_idx++;
+        }
+
+        llist_remove(&buf->node);
+        PyMem_Free(buf);
+    }
+}
 
 /**************************/
 /* the "object" allocator */
index 656d82136d263b9efd5d03320021f9cc02bebad3..04487345f7ec053d0186aad9b09a437001d937d7 100644 (file)
@@ -1837,6 +1837,9 @@ finalize_interp_clear(PyThreadState *tstate)
 
     finalize_interp_types(tstate->interp);
 
+    /* Free any delayed free requests immediately */
+    _PyMem_FiniDelayed(tstate->interp);
+
     /* finalize_interp_types may allocate Python objects so we may need to
        abandon mimalloc segments again */
     _PyThreadState_ClearMimallocHeaps(tstate);
index 1d8c09653d56296d58b9bd1511e0c4c5ff87fc4a..bb8e24c1dbe12fe1d0e9a1f0973e264ee17c67ac 100644 (file)
@@ -617,6 +617,7 @@ init_interpreter(PyInterpreterState *interp,
 #ifdef Py_GIL_DISABLED
     _Py_brc_init_state(interp);
 #endif
+    llist_init(&interp->mem_free_queue.head);
     for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) {
         interp->monitors.tools[i] = 0;
     }
@@ -1353,6 +1354,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
     // Initialize biased reference counting inter-thread queue
     _Py_brc_init_thread(tstate);
 #endif
+    llist_init(&_tstate->mem_free_queue);
 
     if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
         // Start in the suspended state if there is an ongoing stop-the-world.
@@ -1574,6 +1576,7 @@ PyThreadState_Clear(PyThreadState *tstate)
         // don't call _PyInterpreterState_SetNotRunningMain() yet.
         tstate->on_delete(tstate->on_delete_data);
     }
+
 #ifdef Py_GIL_DISABLED
     // Each thread should clear own freelists in free-threading builds.
     struct _Py_object_freelists *freelists = _Py_object_freelists_GET();
@@ -1583,6 +1586,9 @@ PyThreadState_Clear(PyThreadState *tstate)
     _Py_brc_remove_thread(tstate);
 #endif
 
+    // Merge our queue of pointers to be freed into the interpreter queue.
+    _PyMem_AbandonDelayed(tstate);
+
     _PyThreadState_ClearMimallocHeaps(tstate);
 
     tstate->_status.cleared = 1;