]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-110481: Implement inter-thread queue for biased reference counting (#114824)
authorSam Gross <colesbury@gmail.com>
Fri, 9 Feb 2024 22:08:32 +0000 (17:08 -0500)
committerGitHub <noreply@github.com>
Fri, 9 Feb 2024 22:08:32 +0000 (17:08 -0500)
Biased reference counting maintains two refcount fields in each object:
`ob_ref_local` and `ob_ref_shared`. The true refcount is the sum of these two
fields. In some cases, when refcounting operations are split across threads,
the ob_ref_shared field can be negative (although the total refcount must be
at least zero). In this case, the thread that decremented the refcount
requests that the owning thread give up ownership and merge the refcount
fields.

21 files changed:
Include/internal/pycore_brc.h [new file with mode: 0644]
Include/internal/pycore_ceval.h
Include/internal/pycore_interp.h
Include/internal/pycore_object_stack.h
Include/internal/pycore_tstate.h
Lib/test/test_code.py
Lib/test/test_concurrent_futures/executor.py
Lib/test/test_concurrent_futures/test_process_pool.py
Makefile.pre.in
Modules/posixmodule.c
Objects/dictobject.c
Objects/object.c
PCbuild/_freeze_module.vcxproj
PCbuild/_freeze_module.vcxproj.filters
PCbuild/pythoncore.vcxproj
PCbuild/pythoncore.vcxproj.filters
Python/brc.c [new file with mode: 0644]
Python/ceval_gil.c
Python/gc_free_threading.c
Python/object_stack.c
Python/pystate.c

diff --git a/Include/internal/pycore_brc.h b/Include/internal/pycore_brc.h
new file mode 100644 (file)
index 0000000..3453d83
--- /dev/null
@@ -0,0 +1,74 @@
+#ifndef Py_INTERNAL_BRC_H
+#define Py_INTERNAL_BRC_H
+
+#include <stdint.h>
+#include "pycore_llist.h"           // struct llist_node
+#include "pycore_lock.h"            // PyMutex
+#include "pycore_object_stack.h"    // _PyObjectStack
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifndef Py_BUILD_CORE
+#  error "this header requires Py_BUILD_CORE define"
+#endif
+
+#ifdef Py_GIL_DISABLED
+
+// Prime number to avoid correlations with memory addresses.
+#define _Py_BRC_NUM_BUCKETS 257
+
+// Hash table bucket
+struct _brc_bucket {
+    // Mutex protects both the bucket and thread state queues in this bucket.
+    PyMutex mutex;
+
+    // Linked list of _PyThreadStateImpl objects hashed to this bucket.
+    struct llist_node root;
+};
+
+// Per-interpreter biased reference counting state
+struct _brc_state {
+    // Hash table of thread states by thread-id. Thread states within a bucket
+    // are chained using a doubly-linked list.
+    struct _brc_bucket table[_Py_BRC_NUM_BUCKETS];
+};
+
+// Per-thread biased reference counting state
+struct _brc_thread_state {
+    // Linked-list of thread states per hash bucket
+    struct llist_node bucket_node;
+
+    // Thread-id as determined by _PyThread_Id()
+    uintptr_t tid;
+
+    // Objects with refcounts to be merged (protected by bucket mutex)
+    _PyObjectStack objects_to_merge;
+
+    // Local stack of objects to be merged (not accessed by other threads)
+    _PyObjectStack local_objects_to_merge;
+};
+
+// Initialize/finalize the per-thread biased reference counting state
+void _Py_brc_init_thread(PyThreadState *tstate);
+void _Py_brc_remove_thread(PyThreadState *tstate);
+
+// Initialize per-interpreter state
+void _Py_brc_init_state(PyInterpreterState *interp);
+
+void _Py_brc_after_fork(PyInterpreterState *interp);
+
+// Enqueues an object to be merged by it's owning thread (tid). This
+// steals a reference to the object.
+void _Py_brc_queue_object(PyObject *ob);
+
+// Merge the refcounts of queued objects for the current thread.
+void _Py_brc_merge_refcounts(PyThreadState *tstate);
+
+#endif /* Py_GIL_DISABLED */
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* !Py_INTERNAL_BRC_H */
index a66af1389541dd72101fda0127a103e5a2ad84ca..b158fc9ff5ebc1b7a867eb72965ef8c752e3eab1 100644 (file)
@@ -206,6 +206,7 @@ void _PyEval_FrameClearAndPop(PyThreadState *tstate, _PyInterpreterFrame *frame)
 #define _PY_ASYNC_EXCEPTION_BIT 3
 #define _PY_GC_SCHEDULED_BIT 4
 #define _PY_EVAL_PLEASE_STOP_BIT 5
+#define _PY_EVAL_EXPLICIT_MERGE_BIT 6
 
 /* Reserve a few bits for future use */
 #define _PY_EVAL_EVENTS_BITS 8
index f7c332ed747cfac6375b3257292f6a38dff8ba83..31d88071e19d0cf22ab28940040bae85c9bf3598 100644 (file)
@@ -201,6 +201,7 @@ struct _is {
 
 #if defined(Py_GIL_DISABLED)
     struct _mimalloc_interp_state mimalloc;
+    struct _brc_state brc;  // biased reference counting state
 #endif
 
     // Per-interpreter state for the obmalloc allocator.  For the main
index 1dc1c1591525deda0a23477962f60a7da50de882..d042be2a98090a0e2194f6c97d00520c7d941564 100644 (file)
@@ -1,6 +1,8 @@
 #ifndef Py_INTERNAL_OBJECT_STACK_H
 #define Py_INTERNAL_OBJECT_STACK_H
 
+#include "pycore_freelist.h"        // _PyFreeListState
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -74,6 +76,10 @@ _PyObjectStack_Pop(_PyObjectStack *stack)
     return obj;
 }
 
+// Merge src into dst, leaving src empty
+extern void
+_PyObjectStack_Merge(_PyObjectStack *dst, _PyObjectStack *src);
+
 // Remove all items from the stack
 extern void
 _PyObjectStack_Clear(_PyObjectStack *stack);
index 472fa08154e8f92da09bb92090f4ec32c2d43273..77a1dc59163d212511e2b4bd85f9c220c0e1ee0f 100644 (file)
@@ -10,6 +10,7 @@ extern "C" {
 
 #include "pycore_freelist.h"      // struct _Py_freelist_state
 #include "pycore_mimalloc.h"      // struct _mimalloc_thread_state
+#include "pycore_brc.h"           // struct _brc_thread_state
 
 
 // Every PyThreadState is actually allocated as a _PyThreadStateImpl. The
@@ -22,6 +23,7 @@ typedef struct _PyThreadStateImpl {
 #ifdef Py_GIL_DISABLED
     struct _mimalloc_thread_state mimalloc;
     struct _Py_freelist_state freelist_state;
+    struct _brc_thread_state brc;
 #endif
 
 } _PyThreadStateImpl;
index d8fb826edeb681667b0a60dbe0c5d3ea12bf2244..46bebfc7af675b157e17fec912ae27b271728d6e 100644 (file)
@@ -865,6 +865,7 @@ if check_impl_detail(cpython=True) and ctypes is not None:
                     self.test = test
                 def run(self):
                     del self.f
+                    gc_collect()
                     self.test.assertEqual(LAST_FREED, 500)
 
             SetExtra(f.__code__, FREE_INDEX, ctypes.c_voidp(500))
index 1e7d4344740943d54285fb455d1ca345d3880ad0..6a79fe69ec37cf4a053c8e2a4d4f08fdff975857 100644 (file)
@@ -1,8 +1,10 @@
 import threading
 import time
+import unittest
 import weakref
 from concurrent import futures
 from test import support
+from test.support import Py_GIL_DISABLED
 
 
 def mul(x, y):
@@ -83,10 +85,21 @@ class ExecutorTest:
         my_object_collected = threading.Event()
         my_object_callback = weakref.ref(
             my_object, lambda obj: my_object_collected.set())
-        # Deliberately discarding the future.
-        self.executor.submit(my_object.my_method)
+        fut = self.executor.submit(my_object.my_method)
         del my_object
 
+        if Py_GIL_DISABLED:
+            # Due to biased reference counting, my_object might only be
+            # deallocated while the thread that created it runs -- if the
+            # thread is paused waiting on an event, it may not merge the
+            # refcount of the queued object. For that reason, we wait for the
+            # task to finish (so that it's no longer referenced) and force a
+            # GC to ensure that it is collected.
+            fut.result()  # Wait for the task to finish.
+            support.gc_collect()
+        else:
+            del fut  # Deliberately discard the future.
+
         collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
         self.assertTrue(collected,
                         "Stale reference not collected within timeout.")
index 3e61b0c9387c6fa7ba6ec14825bfad853d7d0fd1..7fc59a05f3deac0bb60662a4d9b01331e01ec336 100644 (file)
@@ -98,6 +98,7 @@ class ProcessPoolExecutorTest(ExecutorTest):
 
         # explicitly destroy the object to ensure that EventfulGCObj.__del__()
         # is called while manager is still running.
+        support.gc_collect()
         obj = None
         support.gc_collect()
 
index 07b2ec7adde78a5406d3064118245c4609d69732..4dabe328ce0362ff5823a3f2352e17d271831bfd 100644 (file)
@@ -405,6 +405,7 @@ PYTHON_OBJS=        \
                Python/ast_opt.o \
                Python/ast_unparse.o \
                Python/bltinmodule.o \
+               Python/brc.o \
                Python/ceval.o \
                Python/codecs.o \
                Python/compile.o \
@@ -1081,6 +1082,7 @@ PYTHON_HEADERS= \
                $(srcdir)/Include/internal/pycore_atexit.h \
                $(srcdir)/Include/internal/pycore_bitutils.h \
                $(srcdir)/Include/internal/pycore_blocks_output_buffer.h \
+               $(srcdir)/Include/internal/pycore_brc.h \
                $(srcdir)/Include/internal/pycore_bytes_methods.h \
                $(srcdir)/Include/internal/pycore_bytesobject.h \
                $(srcdir)/Include/internal/pycore_call.h \
index e26265fc874ebb4e3936b21d28e6662d4bca8c8e..230c961a2ac3c04eba3e2a9587ba548aaab3b64c 100644 (file)
@@ -637,6 +637,10 @@ PyOS_AfterFork_Child(void)
     tstate->native_thread_id = PyThread_get_thread_native_id();
 #endif
 
+#ifdef Py_GIL_DISABLED
+    _Py_brc_after_fork(tstate->interp);
+#endif
+
     status = _PyEval_ReInitThreads(tstate);
     if (_PyStatus_EXCEPTION(status)) {
         goto fatal_error;
index 2df95e977a180fa8c98ddaf985524eb1a1fe78bd..9b1defa5cbc609f52af69a2c66678ca13134dc97 100644 (file)
@@ -5989,6 +5989,18 @@ _PyObject_MakeDictFromInstanceAttributes(PyObject *obj, PyDictValues *values)
     return make_dict_from_instance_attributes(interp, keys, values);
 }
 
+static bool
+has_unique_reference(PyObject *op)
+{
+#ifdef Py_GIL_DISABLED
+    return (_Py_IsOwnedByCurrentThread(op) &&
+            op->ob_ref_local == 1 &&
+            _Py_atomic_load_ssize_relaxed(&op->ob_ref_shared) == 0);
+#else
+    return Py_REFCNT(op) == 1;
+#endif
+}
+
 // Return true if the dict was dematerialized, false otherwise.
 bool
 _PyObject_MakeInstanceAttributesFromDict(PyObject *obj, PyDictOrValues *dorv)
@@ -6005,7 +6017,9 @@ _PyObject_MakeInstanceAttributesFromDict(PyObject *obj, PyDictOrValues *dorv)
         return false;
     }
     assert(_PyType_HasFeature(Py_TYPE(obj), Py_TPFLAGS_HEAPTYPE));
-    if (dict->ma_keys != CACHED_KEYS(Py_TYPE(obj)) || Py_REFCNT(dict) != 1) {
+    if (dict->ma_keys != CACHED_KEYS(Py_TYPE(obj)) ||
+        !has_unique_reference((PyObject *)dict))
+    {
         return false;
     }
     assert(dict->ma_values);
index 37a4b7a417e35fabab3d9530732af40a5727d22a..61e6131c6e99bbb97afbe6777808f40080291aed 100644 (file)
@@ -2,6 +2,7 @@
 /* Generic object operations; and implementation of None */
 
 #include "Python.h"
+#include "pycore_brc.h"           // _Py_brc_queue_object()
 #include "pycore_call.h"          // _PyObject_CallNoArgs()
 #include "pycore_ceval.h"         // _Py_EnterRecursiveCallTstate()
 #include "pycore_context.h"       // _PyContextTokenMissing_Type
@@ -344,15 +345,10 @@ _Py_DecRefSharedDebug(PyObject *o, const char *filename, int lineno)
                                                 &shared, new_shared));
 
     if (should_queue) {
-        // TODO: the inter-thread queue is not yet implemented. For now,
-        // we just merge the refcount here.
 #ifdef Py_REF_DEBUG
         _Py_IncRefTotal(_PyInterpreterState_GET());
 #endif
-        Py_ssize_t refcount = _Py_ExplicitMergeRefcount(o, -1);
-        if (refcount == 0) {
-            _Py_Dealloc(o);
-        }
+        _Py_brc_queue_object(o);
     }
     else if (new_shared == _Py_REF_MERGED) {
         // refcount is zero AND merged
index 35788ec4503e8f62a08a59d84a402bab613277df..49f529ebbc2f9b053191d8fb122008b789bfbbe0 100644 (file)
     <ClCompile Include="..\Python\ast_opt.c" />
     <ClCompile Include="..\Python\ast_unparse.c" />
     <ClCompile Include="..\Python\bltinmodule.c" />
+    <ClCompile Include="..\Python\brc.c" />
     <ClCompile Include="..\Python\bootstrap_hash.c" />
     <ClCompile Include="..\Python\ceval.c" />
     <ClCompile Include="..\Python\codecs.c" />
index 7a44179e3561059cd2c88a8f3bde8e29facc3852..5b1bd7552b4cd9851cce352410802b2cf3d1a929 100644 (file)
@@ -46,6 +46,9 @@
     <ClCompile Include="..\Python\bltinmodule.c">
       <Filter>Source Files</Filter>
     </ClCompile>
+    <ClCompile Include="..\Python\brc.c">
+      <Filter>Python</Filter>
+    </ClCompile>
     <ClCompile Include="..\Objects\boolobject.c">
       <Filter>Source Files</Filter>
     </ClCompile>
index e1ff97659659eea3ae0c11fd484740978e6314cd..4cc0ca4b9af8deb9c31270dfbbeeec686e45a736 100644 (file)
     <ClInclude Include="..\Include\internal\pycore_ast_state.h" />
     <ClInclude Include="..\Include\internal\pycore_atexit.h" />
     <ClInclude Include="..\Include\internal\pycore_bitutils.h" />
+    <ClInclude Include="..\Include\internal\pycore_brc.h" />
     <ClInclude Include="..\Include\internal\pycore_bytes_methods.h" />
     <ClInclude Include="..\Include\internal\pycore_bytesobject.h" />
     <ClInclude Include="..\Include\internal\pycore_call.h" />
     <ClCompile Include="..\Python\ast_unparse.c" />
     <ClCompile Include="..\Python\bltinmodule.c" />
     <ClCompile Include="..\Python\bootstrap_hash.c" />
+    <ClCompile Include="..\Python\brc.c" />
     <ClCompile Include="..\Python\ceval.c" />
     <ClCompile Include="..\Python\codecs.c" />
     <ClCompile Include="..\Python\compile.c" />
index 4c55f23006b2f0c5de98fb10b0187e6ef385f89a..ceaa21217267cf1dca3931d00de5dc67bb1435e1 100644 (file)
     <ClInclude Include="..\Include\internal\pycore_bitutils.h">
       <Filter>Include\internal</Filter>
     </ClInclude>
+    <ClInclude Include="..\Include\internal\pycore_brc.h">
+      <Filter>Include\internal</Filter>
+    </ClInclude>
     <ClInclude Include="..\Include\internal\pycore_bytes_methods.h">
       <Filter>Include\internal</Filter>
     </ClInclude>
     <ClCompile Include="..\Python\bltinmodule.c">
       <Filter>Python</Filter>
     </ClCompile>
+    <ClCompile Include="..\Python\brc.c">
+      <Filter>Python</Filter>
+    </ClCompile>
     <ClCompile Include="..\Python\ceval.c">
       <Filter>Python</Filter>
     </ClCompile>
diff --git a/Python/brc.c b/Python/brc.c
new file mode 100644 (file)
index 0000000..f1fd57a
--- /dev/null
@@ -0,0 +1,198 @@
+// Implementation of biased reference counting inter-thread queue.
+//
+// Biased reference counting maintains two refcount fields in each object:
+// ob_ref_local and ob_ref_shared. The true refcount is the sum of these two
+// fields. In some cases, when refcounting operations are split across threads,
+// the ob_ref_shared field can be negative (although the total refcount must
+// be at least zero). In this case, the thread that decremented the refcount
+// requests that the owning thread give up ownership and merge the refcount
+// fields. This file implements the mechanism for doing so.
+//
+// Each thread state maintains a queue of objects whose refcounts it should
+// merge. The thread states are stored in a per-interpreter hash table by
+// thread id. The hash table has a fixed size and uses a linked list to store
+// thread states within each bucket.
+//
+// The queueing thread uses the eval breaker mechanism to notify the owning
+// thread that it has objects to merge. Additionaly, all queued objects are
+// merged during GC.
+#include "Python.h"
+#include "pycore_object.h"      // _Py_ExplicitMergeRefcount
+#include "pycore_brc.h"         // struct _brc_thread_state
+#include "pycore_ceval.h"       // _Py_set_eval_breaker_bit
+#include "pycore_llist.h"       // struct llist_node
+#include "pycore_pystate.h"     // _PyThreadStateImpl
+
+#ifdef Py_GIL_DISABLED
+
+// Get the hashtable bucket for a given thread id.
+static struct _brc_bucket *
+get_bucket(PyInterpreterState *interp, uintptr_t tid)
+{
+    return &interp->brc.table[tid % _Py_BRC_NUM_BUCKETS];
+}
+
+// Find the thread state in a hash table bucket by thread id.
+static _PyThreadStateImpl *
+find_thread_state(struct _brc_bucket *bucket, uintptr_t thread_id)
+{
+    struct llist_node *node;
+    llist_for_each(node, &bucket->root) {
+        // Get the containing _PyThreadStateImpl from the linked-list node.
+        _PyThreadStateImpl *ts = llist_data(node, _PyThreadStateImpl,
+                                            brc.bucket_node);
+        if (ts->brc.tid == thread_id) {
+            return ts;
+        }
+    }
+    return NULL;
+}
+
+// Enqueue an object to be merged by the owning thread. This steals a
+// reference to the object.
+void
+_Py_brc_queue_object(PyObject *ob)
+{
+    PyInterpreterState *interp = _PyInterpreterState_GET();
+
+    uintptr_t ob_tid = _Py_atomic_load_uintptr(&ob->ob_tid);
+    if (ob_tid == 0) {
+        // The owning thread may have concurrently decided to merge the
+        // refcount fields.
+        Py_DECREF(ob);
+        return;
+    }
+
+    struct _brc_bucket *bucket = get_bucket(interp, ob_tid);
+    PyMutex_Lock(&bucket->mutex);
+    _PyThreadStateImpl *tstate = find_thread_state(bucket, ob_tid);
+    if (tstate == NULL) {
+        // If we didn't find the owning thread then it must have already exited.
+        // It's safe (and necessary) to merge the refcount. Subtract one when
+        // merging because we've stolen a reference.
+        Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1);
+        PyMutex_Unlock(&bucket->mutex);
+        if (refcount == 0) {
+            _Py_Dealloc(ob);
+        }
+        return;
+    }
+
+    if (_PyObjectStack_Push(&tstate->brc.objects_to_merge, ob) < 0) {
+        PyMutex_Unlock(&bucket->mutex);
+
+        // Fall back to stopping all threads and manually merging the refcount
+        // if we can't enqueue the object to be merged.
+        _PyEval_StopTheWorld(interp);
+        Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1);
+        _PyEval_StartTheWorld(interp);
+
+        if (refcount == 0) {
+            _Py_Dealloc(ob);
+        }
+        return;
+    }
+
+    // Notify owning thread
+    _Py_set_eval_breaker_bit(interp, _PY_EVAL_EXPLICIT_MERGE_BIT, 1);
+
+    PyMutex_Unlock(&bucket->mutex);
+}
+
+static void
+merge_queued_objects(_PyObjectStack *to_merge)
+{
+    PyObject *ob;
+    while ((ob = _PyObjectStack_Pop(to_merge)) != NULL) {
+        // Subtract one when merging because the queue had a reference.
+        Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1);
+        if (refcount == 0) {
+            _Py_Dealloc(ob);
+        }
+    }
+}
+
+// Process this thread's queue of objects to merge.
+void
+_Py_brc_merge_refcounts(PyThreadState *tstate)
+{
+    struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc;
+    struct _brc_bucket *bucket = get_bucket(tstate->interp, brc->tid);
+
+    // Append all objects into a local stack. We don't want to hold the lock
+    // while calling destructors.
+    PyMutex_Lock(&bucket->mutex);
+    _PyObjectStack_Merge(&brc->local_objects_to_merge, &brc->objects_to_merge);
+    PyMutex_Unlock(&bucket->mutex);
+
+    // Process the local stack until it's empty
+    merge_queued_objects(&brc->local_objects_to_merge);
+}
+
+void
+_Py_brc_init_state(PyInterpreterState *interp)
+{
+    struct _brc_state *brc = &interp->brc;
+    for (Py_ssize_t i = 0; i < _Py_BRC_NUM_BUCKETS; i++) {
+        llist_init(&brc->table[i].root);
+    }
+}
+
+void
+_Py_brc_init_thread(PyThreadState *tstate)
+{
+    struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc;
+    brc->tid = _Py_ThreadId();
+
+    // Add ourself to the hashtable
+    struct _brc_bucket *bucket = get_bucket(tstate->interp, brc->tid);
+    PyMutex_Lock(&bucket->mutex);
+    llist_insert_tail(&bucket->root, &brc->bucket_node);
+    PyMutex_Unlock(&bucket->mutex);
+}
+
+void
+_Py_brc_remove_thread(PyThreadState *tstate)
+{
+    struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc;
+    struct _brc_bucket *bucket = get_bucket(tstate->interp, brc->tid);
+
+    // We need to fully process any objects to merge before removing ourself
+    // from the hashtable. It is not safe to perform any refcount operations
+    // after we are removed. After that point, other threads treat our objects
+    // as abandoned and may merge the objects' refcounts directly.
+    bool empty = false;
+    while (!empty) {
+        // Process the local stack until it's empty
+        merge_queued_objects(&brc->local_objects_to_merge);
+
+        PyMutex_Lock(&bucket->mutex);
+        empty = (brc->objects_to_merge.head == NULL);
+        if (empty) {
+            llist_remove(&brc->bucket_node);
+        }
+        else {
+            _PyObjectStack_Merge(&brc->local_objects_to_merge,
+                                 &brc->objects_to_merge);
+        }
+        PyMutex_Unlock(&bucket->mutex);
+    }
+
+    assert(brc->local_objects_to_merge.head == NULL);
+    assert(brc->objects_to_merge.head == NULL);
+}
+
+void
+_Py_brc_after_fork(PyInterpreterState *interp)
+{
+    // Unlock all bucket mutexes. Some of the buckets may be locked because
+    // locks can be handed off to a parked thread (see lock.c). We don't have
+    // to worry about consistency here, becuase no thread can be actively
+    // modifying a bucket, but it might be paused (not yet woken up) on a
+    // PyMutex_Lock while holding that lock.
+    for (Py_ssize_t i = 0; i < _Py_BRC_NUM_BUCKETS; i++) {
+        _PyMutex_at_fork_reinit(&interp->brc.table[i].mutex);
+    }
+}
+
+#endif  /* Py_GIL_DISABLED */
index ad90359318761a5ab01b7bf38215f59f0f7648e8..deb9741291fca73220c1a17329459ed75017b597 100644 (file)
@@ -980,6 +980,14 @@ _Py_HandlePending(PyThreadState *tstate)
         }
     }
 
+#ifdef Py_GIL_DISABLED
+    /* Objects with refcounts to merge */
+    if (_Py_eval_breaker_bit_is_set(interp, _PY_EVAL_EXPLICIT_MERGE_BIT)) {
+        _Py_set_eval_breaker_bit(interp, _PY_EVAL_EXPLICIT_MERGE_BIT, 0);
+        _Py_brc_merge_refcounts(tstate);
+    }
+#endif
+
     /* GC scheduled to run */
     if (_Py_eval_breaker_bit_is_set(interp, _PY_GC_SCHEDULED_BIT)) {
         _Py_set_eval_breaker_bit(interp, _PY_GC_SCHEDULED_BIT, 0);
index 8fbcdb15109b76c747a6733166cf0957de7f465b..5d3b097dee93e871ac7da2948f8f3b7b8549a911 100644 (file)
@@ -1,5 +1,6 @@
 // Cyclic garbage collector implementation for free-threaded build.
 #include "Python.h"
+#include "pycore_brc.h"           // struct _brc_thread_state
 #include "pycore_ceval.h"         // _Py_set_eval_breaker_bit()
 #include "pycore_context.h"
 #include "pycore_dict.h"          // _PyDict_MaybeUntrack()
@@ -152,8 +153,7 @@ gc_decref(PyObject *op)
     op->ob_tid -= 1;
 }
 
-// Merge refcounts while the world is stopped.
-static void
+static Py_ssize_t
 merge_refcount(PyObject *op, Py_ssize_t extra)
 {
     assert(_PyInterpreterState_GET()->stoptheworld.world_stopped);
@@ -169,6 +169,7 @@ merge_refcount(PyObject *op, Py_ssize_t extra)
     op->ob_tid = 0;
     op->ob_ref_local = 0;
     op->ob_ref_shared = _Py_REF_SHARED(refcount, _Py_REF_MERGED);
+    return refcount;
 }
 
 static void
@@ -282,6 +283,41 @@ gc_visit_heaps(PyInterpreterState *interp, mi_block_visit_fun *visitor,
     return err;
 }
 
+static void
+merge_queued_objects(_PyThreadStateImpl *tstate, struct collection_state *state)
+{
+    struct _brc_thread_state *brc = &tstate->brc;
+    _PyObjectStack_Merge(&brc->local_objects_to_merge, &brc->objects_to_merge);
+
+    PyObject *op;
+    while ((op = _PyObjectStack_Pop(&brc->local_objects_to_merge)) != NULL) {
+        // Subtract one when merging because the queue had a reference.
+        Py_ssize_t refcount = merge_refcount(op, -1);
+
+        if (!_PyObject_GC_IS_TRACKED(op) && refcount == 0) {
+            // GC objects with zero refcount are handled subsequently by the
+            // GC as if they were cyclic trash, but we have to handle dead
+            // non-GC objects here. Add one to the refcount so that we can
+            // decref and deallocate the object once we start the world again.
+            op->ob_ref_shared += (1 << _Py_REF_SHARED_SHIFT);
+#ifdef Py_REF_DEBUG
+            _Py_IncRefTotal(_PyInterpreterState_GET());
+#endif
+            worklist_push(&state->objs_to_decref, op);
+        }
+    }
+}
+
+static void
+merge_all_queued_objects(PyInterpreterState *interp, struct collection_state *state)
+{
+    HEAD_LOCK(&_PyRuntime);
+    for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) {
+        merge_queued_objects((_PyThreadStateImpl *)p, state);
+    }
+    HEAD_UNLOCK(&_PyRuntime);
+}
+
 // Subtract an incoming reference from the computed "gc_refs" refcount.
 static int
 visit_decref(PyObject *op, void *arg)
@@ -927,6 +963,9 @@ static void
 gc_collect_internal(PyInterpreterState *interp, struct collection_state *state)
 {
     _PyEval_StopTheWorld(interp);
+    // merge refcounts for all queued objects
+    merge_all_queued_objects(interp, state);
+
     // Find unreachable objects
     int err = deduce_unreachable_heap(interp, state);
     if (err < 0) {
@@ -946,6 +985,9 @@ gc_collect_internal(PyInterpreterState *interp, struct collection_state *state)
     clear_weakrefs(state);
     _PyEval_StartTheWorld(interp);
 
+    // Deallocate any object from the refcount merge step
+    cleanup_worklist(&state->objs_to_decref);
+
     // Call weakref callbacks and finalizers after unpausing other threads to
     // avoid potential deadlocks.
     call_weakref_callbacks(state);
index 8544892eb71dcb62eec024a7e636c7895e7bdf02..ced4460da00f4427e94f9254eeae7848ec73578a 100644 (file)
@@ -67,6 +67,27 @@ _PyObjectStack_Clear(_PyObjectStack *queue)
     }
 }
 
+void
+_PyObjectStack_Merge(_PyObjectStack *dst, _PyObjectStack *src)
+{
+    if (src->head == NULL) {
+        return;
+    }
+
+    if (dst->head != NULL) {
+        // First, append dst to the bottom of src
+        _PyObjectStackChunk *last = src->head;
+        while (last->prev != NULL) {
+            last = last->prev;
+        }
+        last->prev = dst->head;
+    }
+
+    // Now that src has all the chunks, set dst to src
+    dst->head = src->head;
+    src->head = NULL;
+}
+
 void
 _PyObjectStackChunk_ClearFreeList(_PyFreeListState *free_lists, int is_finalization)
 {
index e77e5bfa7e2df86a770499520022823b9950caf2..6cd034743ddf4c325e7660bd1bedc56019e8ec69 100644 (file)
@@ -611,6 +611,9 @@ init_interpreter(PyInterpreterState *interp,
     _PyGC_InitState(&interp->gc);
     PyConfig_InitPythonConfig(&interp->config);
     _PyType_InitCache(interp);
+#ifdef Py_GIL_DISABLED
+    _Py_brc_init_state(interp);
+#endif
     for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) {
         interp->monitors.tools[i] = 0;
     }
@@ -1336,6 +1339,11 @@ init_threadstate(_PyThreadStateImpl *_tstate,
     tstate->datastack_limit = NULL;
     tstate->what_event = -1;
 
+#ifdef Py_GIL_DISABLED
+    // Initialize biased reference counting inter-thread queue
+    _Py_brc_init_thread(tstate);
+#endif
+
     if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
         // Start in the suspended state if there is an ongoing stop-the-world.
         tstate->state = _Py_THREAD_SUSPENDED;
@@ -1561,6 +1569,9 @@ PyThreadState_Clear(PyThreadState *tstate)
     _PyFreeListState *freelist_state = &((_PyThreadStateImpl*)tstate)->freelist_state;
     _Py_ClearFreeLists(freelist_state, 1);
     _PySlice_ClearCache(freelist_state);
+
+    // Remove ourself from the biased reference counting table of threads.
+    _Py_brc_remove_thread(tstate);
 #endif
 
     _PyThreadState_ClearMimallocHeaps(tstate);