In free-threaded builds, concurrent calls to PyDict_AddWatcher, PyDict_ClearWatcher, PyDict_Watch, and PyDict_Unwatch can race on the shared callback array and the per-dict watcher tags. This change adds a mutex to serialize watcher registration and removal, atomic operations for tag updates, and atomic acquire/release synchronization for callback dispatch in _PyDict_SendEvent.
struct _Py_dict_state {
uint32_t next_keys_version;
+ PyMutex watcher_mutex; // Protects the watchers array (free-threaded builds)
+ _PyOnceFlag watcher_setup_once; // One-time optimizer watcher setup
PyDict_WatchCallback watchers[DICT_MAX_WATCHERS];
};
#define FT_ATOMIC_ADD_SSIZE(value, new_value) \
(void)_Py_atomic_add_ssize(&value, new_value)
#define FT_MUTEX_LOCK(lock) PyMutex_Lock(lock)
+#define FT_MUTEX_LOCK_FLAGS(lock, flags) PyMutex_LockFlags(lock, flags)
#define FT_MUTEX_UNLOCK(lock) PyMutex_Unlock(lock)
#else
#define FT_ATOMIC_STORE_ULLONG_RELAXED(value, new_value) value = new_value
#define FT_ATOMIC_ADD_SSIZE(value, new_value) (void)(value += new_value)
#define FT_MUTEX_LOCK(lock) do {} while (0)
+#define FT_MUTEX_LOCK_FLAGS(lock, flags) do {} while (0)
#define FT_MUTEX_UNLOCK(lock) do {} while (0)
#endif
--- /dev/null
+import unittest
+
+from test.support import import_helper, threading_helper
+
+_testcapi = import_helper.import_module("_testcapi")
+
+ITERS = 100
+NTHREADS = 20
+
+
+@threading_helper.requires_working_threading()
+class TestDictWatcherThreadSafety(unittest.TestCase):
+ # Watcher kinds from _testcapi
+ EVENTS = 0 # appends dict events as strings to global event list
+
+ def test_concurrent_add_clear_watchers(self):
+ """Race AddWatcher and ClearWatcher from multiple threads.
+
+ Uses more threads than available watcher slots (5 user slots out
+ of DICT_MAX_WATCHERS=8).
+ """
+ results = []
+
+ def worker():
+ for _ in range(ITERS):
+ try:
+ wid = _testcapi.add_dict_watcher(self.EVENTS)
+ except RuntimeError:
+ continue # All slots taken
+ self.assertGreaterEqual(wid, 0)
+ results.append(wid)
+ _testcapi.clear_dict_watcher(wid)
+
+ threading_helper.run_concurrently(worker, NTHREADS)
+
+ # Verify at least some watchers were successfully added
+ self.assertGreater(len(results), 0)
+
+ def test_concurrent_watch_unwatch(self):
+ """Race Watch and Unwatch on the same dict from multiple threads."""
+ wid = _testcapi.add_dict_watcher(self.EVENTS)
+ dicts = [{} for _ in range(10)]
+
+ def worker():
+ for _ in range(ITERS):
+ for d in dicts:
+ _testcapi.watch_dict(wid, d)
+ for d in dicts:
+ _testcapi.unwatch_dict(wid, d)
+
+ try:
+ threading_helper.run_concurrently(worker, NTHREADS)
+
+ # Verify watching still works after concurrent watch/unwatch
+ _testcapi.watch_dict(wid, dicts[0])
+ dicts[0]["key"] = "value"
+ events = _testcapi.get_dict_watcher_events()
+ self.assertIn("new:key:value", events)
+ finally:
+ _testcapi.clear_dict_watcher(wid)
+
+ def test_concurrent_modify_watched_dict(self):
+ """Race dict mutations (triggering callbacks) with watch/unwatch."""
+ wid = _testcapi.add_dict_watcher(self.EVENTS)
+ d = {}
+ _testcapi.watch_dict(wid, d)
+
+ def mutator():
+ for i in range(ITERS):
+ d[f"key_{i}"] = i
+ d.pop(f"key_{i}", None)
+
+ def toggler():
+ for i in range(ITERS):
+ _testcapi.watch_dict(wid, d)
+ d[f"toggler_{i}"] = i
+ _testcapi.unwatch_dict(wid, d)
+
+ workers = [mutator, toggler] * (NTHREADS // 2)
+ try:
+ threading_helper.run_concurrently(workers)
+ events = _testcapi.get_dict_watcher_events()
+ self.assertGreater(len(events), 0)
+ finally:
+ _testcapi.clear_dict_watcher(wid)
+
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+Made :c:func:`PyDict_AddWatcher`, :c:func:`PyDict_ClearWatcher`,
+:c:func:`PyDict_Watch`, and :c:func:`PyDict_Unwatch` thread-safe on the
+:term:`free threaded <free threading>` build.
#include "pycore_function.h" // FUNC_MAX_WATCHERS
#include "pycore_interp_structs.h" // CODE_MAX_WATCHERS
#include "pycore_context.h" // CONTEXT_MAX_WATCHERS
+#include "pycore_lock.h" // _PyOnceFlag
/*[clinic input]
module _testcapi
// Test dict watching
static PyObject *g_dict_watch_events = NULL;
static int g_dict_watchers_installed = 0;
+static _PyOnceFlag g_dict_watch_once = {0};
+
+static int
+_init_dict_watch_events(void *arg)
+{
+ g_dict_watch_events = PyList_New(0);
+ return g_dict_watch_events ? 0 : -1;
+}
static int
dict_watch_callback(PyDict_WatchEvent event,
if (watcher_id < 0) {
return NULL;
}
- if (!g_dict_watchers_installed) {
- assert(!g_dict_watch_events);
- if (!(g_dict_watch_events = PyList_New(0))) {
- return NULL;
- }
+ if (_PyOnceFlag_CallOnce(&g_dict_watch_once, _init_dict_watch_events, NULL) < 0) {
+ return NULL;
}
- g_dict_watchers_installed++;
+ _Py_atomic_add_int(&g_dict_watchers_installed, 1);
return PyLong_FromLong(watcher_id);
}
if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
return NULL;
}
- g_dict_watchers_installed--;
- if (!g_dict_watchers_installed) {
- assert(g_dict_watch_events);
- Py_CLEAR(g_dict_watch_events);
+ if (_Py_atomic_add_int(&g_dict_watchers_installed, -1) == 1) {
+ PyList_Clear(g_dict_watch_events);
}
Py_RETURN_NONE;
}
static PyObject *
get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
{
- if (!g_dict_watch_events) {
+ if (_Py_atomic_load_int(&g_dict_watchers_installed) <= 0) {
PyErr_SetString(PyExc_RuntimeError, "no watchers active");
return NULL;
}
PyErr_Format(PyExc_ValueError, "Invalid dict watcher ID %d", watcher_id);
return -1;
}
- if (!interp->dict_state.watchers[watcher_id]) {
+ PyDict_WatchCallback cb = FT_ATOMIC_LOAD_PTR_RELAXED(
+ interp->dict_state.watchers[watcher_id]);
+ if (cb == NULL) {
PyErr_Format(PyExc_ValueError, "No dict watcher set for ID %d", watcher_id);
return -1;
}
return 0;
}
+// In free-threaded builds, Add/Clear serialize on watcher_mutex and publish
+// callbacks with release stores. SendEvent reads them lock-free using
+// acquire loads.
+
int
PyDict_Watch(int watcher_id, PyObject* dict)
{
if (validate_watcher_id(interp, watcher_id)) {
return -1;
}
- FT_ATOMIC_OR_UINT64(((PyDictObject*)dict)->_ma_watcher_tag, (1LL << watcher_id));
+ FT_ATOMIC_OR_UINT64(((PyDictObject*)dict)->_ma_watcher_tag,
+ 1ULL << watcher_id);
return 0;
}
if (validate_watcher_id(interp, watcher_id)) {
return -1;
}
- FT_ATOMIC_AND_UINT64(((PyDictObject*)dict)->_ma_watcher_tag, ~(1LL << watcher_id));
+ FT_ATOMIC_AND_UINT64(((PyDictObject*)dict)->_ma_watcher_tag,
+ ~(1ULL << watcher_id));
return 0;
}
int
PyDict_AddWatcher(PyDict_WatchCallback callback)
{
+ int watcher_id = -1;
PyInterpreterState *interp = _PyInterpreterState_GET();
+ FT_MUTEX_LOCK_FLAGS(&interp->dict_state.watcher_mutex,
+ _Py_LOCK_DONT_DETACH);
/* Some watchers are reserved for CPython, start at the first available one */
for (int i = FIRST_AVAILABLE_WATCHER; i < DICT_MAX_WATCHERS; i++) {
if (!interp->dict_state.watchers[i]) {
- interp->dict_state.watchers[i] = callback;
- return i;
+ FT_ATOMIC_STORE_PTR_RELEASE(interp->dict_state.watchers[i], callback);
+ watcher_id = i;
+ goto done;
}
}
-
PyErr_SetString(PyExc_RuntimeError, "no more dict watcher IDs available");
- return -1;
+done:
+ FT_MUTEX_UNLOCK(&interp->dict_state.watcher_mutex);
+ return watcher_id;
}
int
PyDict_ClearWatcher(int watcher_id)
{
+ int res = 0;
PyInterpreterState *interp = _PyInterpreterState_GET();
+ FT_MUTEX_LOCK_FLAGS(&interp->dict_state.watcher_mutex,
+ _Py_LOCK_DONT_DETACH);
if (validate_watcher_id(interp, watcher_id)) {
- return -1;
+ res = -1;
+ goto done;
}
- interp->dict_state.watchers[watcher_id] = NULL;
- return 0;
+ FT_ATOMIC_STORE_PTR_RELEASE(interp->dict_state.watchers[watcher_id], NULL);
+done:
+ FT_MUTEX_UNLOCK(&interp->dict_state.watcher_mutex);
+ return res;
}
static const char *
PyInterpreterState *interp = _PyInterpreterState_GET();
for (int i = 0; i < DICT_MAX_WATCHERS; i++) {
if (watcher_bits & 1) {
- PyDict_WatchCallback cb = interp->dict_state.watchers[i];
+ PyDict_WatchCallback cb = FT_ATOMIC_LOAD_PTR_ACQUIRE(
+ interp->dict_state.watchers[i]);
if (cb && (cb(event, (PyObject*)mp, key, value) < 0)) {
// We don't want to resurrect the dict by potentially having an
// unraisablehook keep a reference to it, so we don't pass the
#include "pycore_opcode_metadata.h"
#include "pycore_opcode_utils.h"
#include "pycore_pystate.h" // _PyInterpreterState_GET()
+#include "pycore_pyatomic_ft_wrappers.h" // FT_ATOMIC_*
#include "pycore_tstate.h" // _PyThreadStateImpl
#include "pycore_uop_metadata.h"
#include "pycore_long.h"
increment_mutations(PyObject* dict) {
assert(PyDict_CheckExact(dict));
PyDictObject *d = (PyDictObject *)dict;
- FT_ATOMIC_ADD_UINT64(d->_ma_watcher_tag, (1 << DICT_MAX_WATCHERS));
+ FT_ATOMIC_ADD_UINT64(d->_ma_watcher_tag, 1ULL << DICT_MAX_WATCHERS);
}
/* The first two dict watcher IDs are reserved for CPython,
return 0;
}
+static int
+_setup_optimizer_watchers(void *Py_UNUSED(arg))
+{
+ PyInterpreterState *interp = _PyInterpreterState_GET();
+ FT_ATOMIC_STORE_PTR_RELEASE(
+ interp->dict_state.watchers[GLOBALS_WATCHER_ID],
+ globals_watcher_callback);
+ interp->type_watchers[TYPE_WATCHER_ID] = type_watcher_callback;
+ return 0;
+}
+
static void
watch_type(PyTypeObject *type, _PyBloomFilter *filter)
{
// Make sure that watchers are set up
PyInterpreterState *interp = _PyInterpreterState_GET();
- if (interp->dict_state.watchers[GLOBALS_WATCHER_ID] == NULL) {
- interp->dict_state.watchers[GLOBALS_WATCHER_ID] = globals_watcher_callback;
- interp->type_watchers[TYPE_WATCHER_ID] = type_watcher_callback;
- }
+ _PyOnceFlag_CallOnce(&interp->dict_state.watcher_setup_once,
+ _setup_optimizer_watchers, NULL);
_Py_uop_abstractcontext_init(ctx, dependencies);
_Py_UOpsAbstractFrame *frame = _Py_uop_frame_new(ctx, (PyCodeObject *)func->func_code, NULL, 0);
&(runtime)->allocators.mutex, \
&(runtime)->_main_interpreter.types.mutex, \
&(runtime)->_main_interpreter.code_state.mutex, \
+ &(runtime)->_main_interpreter.dict_state.watcher_mutex, \
}
static void
Modules/_testcapi/object.c - MyType -
Modules/_testcapi/structmember.c - test_structmembersType_OldAPI -
Modules/_testcapi/watchers.c - g_dict_watch_events -
+Modules/_testcapi/watchers.c - g_dict_watch_once -
Modules/_testcapi/watchers.c - g_dict_watchers_installed -
Modules/_testcapi/watchers.c - g_type_modified_events -
Modules/_testcapi/watchers.c - g_type_watchers_installed -