]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-93649: Split watcher API tests from _testcapimodule.c (#99532)
authorErlend E. Aasland <erlend.aasland@protonmail.com>
Wed, 16 Nov 2022 19:13:32 +0000 (20:13 +0100)
committerGitHub <noreply@github.com>
Wed, 16 Nov 2022 19:13:32 +0000 (20:13 +0100)
Lib/test/test_capi/test_misc.py
Lib/test/test_capi/test_watchers.py [new file with mode: 0644]
Modules/Setup.stdlib.in
Modules/_testcapi/parts.h
Modules/_testcapi/watchers.c [new file with mode: 0644]
Modules/_testcapimodule.c
PCbuild/_testcapi.vcxproj
PCbuild/_testcapi.vcxproj.filters

index ea4c9de47d73def07490a2d99bf7a141e0e219f0..1d30adaee9218fcff8fe19cf594e609c9ae8a59a 100644 (file)
@@ -2,7 +2,6 @@
 # these are all functions _testcapi exports whose name begins with 'test_'.
 
 from collections import OrderedDict
-from contextlib import contextmanager, ExitStack
 import _thread
 import importlib.machinery
 import importlib.util
@@ -20,7 +19,6 @@ import warnings
 import weakref
 from test import support
 from test.support import MISSING_C_DOCSTRINGS
-from test.support import catch_unraisable_exception
 from test.support import import_helper
 from test.support import threading_helper
 from test.support import warnings_helper
@@ -1705,333 +1703,5 @@ class Test_Pep523API(unittest.TestCase):
         self.do_test(func2)
 
 
-class TestDictWatchers(unittest.TestCase):
-    # types of watchers testcapimodule can add:
-    EVENTS = 0   # appends dict events as strings to global event list
-    ERROR = 1    # unconditionally sets and signals a RuntimeException
-    SECOND = 2   # always appends "second" to global event list
-
-    def add_watcher(self, kind=EVENTS):
-        return _testcapi.add_dict_watcher(kind)
-
-    def clear_watcher(self, watcher_id):
-        _testcapi.clear_dict_watcher(watcher_id)
-
-    @contextmanager
-    def watcher(self, kind=EVENTS):
-        wid = self.add_watcher(kind)
-        try:
-            yield wid
-        finally:
-            self.clear_watcher(wid)
-
-    def assert_events(self, expected):
-        actual = _testcapi.get_dict_watcher_events()
-        self.assertEqual(actual, expected)
-
-    def watch(self, wid, d):
-        _testcapi.watch_dict(wid, d)
-
-    def unwatch(self, wid, d):
-        _testcapi.unwatch_dict(wid, d)
-
-    def test_set_new_item(self):
-        d = {}
-        with self.watcher() as wid:
-            self.watch(wid, d)
-            d["foo"] = "bar"
-            self.assert_events(["new:foo:bar"])
-
-    def test_set_existing_item(self):
-        d = {"foo": "bar"}
-        with self.watcher() as wid:
-            self.watch(wid, d)
-            d["foo"] = "baz"
-            self.assert_events(["mod:foo:baz"])
-
-    def test_clone(self):
-        d = {}
-        d2 = {"foo": "bar"}
-        with self.watcher() as wid:
-            self.watch(wid, d)
-            d.update(d2)
-            self.assert_events(["clone"])
-
-    def test_no_event_if_not_watched(self):
-        d = {}
-        with self.watcher() as wid:
-            d["foo"] = "bar"
-            self.assert_events([])
-
-    def test_del(self):
-        d = {"foo": "bar"}
-        with self.watcher() as wid:
-            self.watch(wid, d)
-            del d["foo"]
-            self.assert_events(["del:foo"])
-
-    def test_pop(self):
-        d = {"foo": "bar"}
-        with self.watcher() as wid:
-            self.watch(wid, d)
-            d.pop("foo")
-            self.assert_events(["del:foo"])
-
-    def test_clear(self):
-        d = {"foo": "bar"}
-        with self.watcher() as wid:
-            self.watch(wid, d)
-            d.clear()
-            self.assert_events(["clear"])
-
-    def test_dealloc(self):
-        d = {"foo": "bar"}
-        with self.watcher() as wid:
-            self.watch(wid, d)
-            del d
-            self.assert_events(["dealloc"])
-
-    def test_unwatch(self):
-        d = {}
-        with self.watcher() as wid:
-            self.watch(wid, d)
-            d["foo"] = "bar"
-            self.unwatch(wid, d)
-            d["hmm"] = "baz"
-            self.assert_events(["new:foo:bar"])
-
-    def test_error(self):
-        d = {}
-        with self.watcher(kind=self.ERROR) as wid:
-            self.watch(wid, d)
-            with catch_unraisable_exception() as cm:
-                d["foo"] = "bar"
-                self.assertIs(cm.unraisable.object, d)
-                self.assertEqual(str(cm.unraisable.exc_value), "boom!")
-            self.assert_events([])
-
-    def test_two_watchers(self):
-        d1 = {}
-        d2 = {}
-        with self.watcher() as wid1:
-            with self.watcher(kind=self.SECOND) as wid2:
-                self.watch(wid1, d1)
-                self.watch(wid2, d2)
-                d1["foo"] = "bar"
-                d2["hmm"] = "baz"
-                self.assert_events(["new:foo:bar", "second"])
-
-    def test_watch_non_dict(self):
-        with self.watcher() as wid:
-            with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
-                self.watch(wid, 1)
-
-    def test_watch_out_of_range_watcher_id(self):
-        d = {}
-        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
-            self.watch(-1, d)
-        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
-            self.watch(8, d)  # DICT_MAX_WATCHERS = 8
-
-    def test_watch_unassigned_watcher_id(self):
-        d = {}
-        with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
-            self.watch(1, d)
-
-    def test_unwatch_non_dict(self):
-        with self.watcher() as wid:
-            with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
-                self.unwatch(wid, 1)
-
-    def test_unwatch_out_of_range_watcher_id(self):
-        d = {}
-        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
-            self.unwatch(-1, d)
-        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
-            self.unwatch(8, d)  # DICT_MAX_WATCHERS = 8
-
-    def test_unwatch_unassigned_watcher_id(self):
-        d = {}
-        with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
-            self.unwatch(1, d)
-
-    def test_clear_out_of_range_watcher_id(self):
-        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
-            self.clear_watcher(-1)
-        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
-            self.clear_watcher(8)  # DICT_MAX_WATCHERS = 8
-
-    def test_clear_unassigned_watcher_id(self):
-        with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
-            self.clear_watcher(1)
-
-
-class TestTypeWatchers(unittest.TestCase):
-    # types of watchers testcapimodule can add:
-    TYPES = 0    # appends modified types to global event list
-    ERROR = 1    # unconditionally sets and signals a RuntimeException
-    WRAP = 2     # appends modified type wrapped in list to global event list
-
-    # duplicating the C constant
-    TYPE_MAX_WATCHERS = 8
-
-    def add_watcher(self, kind=TYPES):
-        return _testcapi.add_type_watcher(kind)
-
-    def clear_watcher(self, watcher_id):
-        _testcapi.clear_type_watcher(watcher_id)
-
-    @contextmanager
-    def watcher(self, kind=TYPES):
-        wid = self.add_watcher(kind)
-        try:
-            yield wid
-        finally:
-            self.clear_watcher(wid)
-
-    def assert_events(self, expected):
-        actual = _testcapi.get_type_modified_events()
-        self.assertEqual(actual, expected)
-
-    def watch(self, wid, t):
-        _testcapi.watch_type(wid, t)
-
-    def unwatch(self, wid, t):
-        _testcapi.unwatch_type(wid, t)
-
-    def test_watch_type(self):
-        class C: pass
-        with self.watcher() as wid:
-            self.watch(wid, C)
-            C.foo = "bar"
-            self.assert_events([C])
-
-    def test_event_aggregation(self):
-        class C: pass
-        with self.watcher() as wid:
-            self.watch(wid, C)
-            C.foo = "bar"
-            C.bar = "baz"
-            # only one event registered for both modifications
-            self.assert_events([C])
-
-    def test_lookup_resets_aggregation(self):
-        class C: pass
-        with self.watcher() as wid:
-            self.watch(wid, C)
-            C.foo = "bar"
-            # lookup resets type version tag
-            self.assertEqual(C.foo, "bar")
-            C.bar = "baz"
-            # both events registered
-            self.assert_events([C, C])
-
-    def test_unwatch_type(self):
-        class C: pass
-        with self.watcher() as wid:
-            self.watch(wid, C)
-            C.foo = "bar"
-            self.assertEqual(C.foo, "bar")
-            self.assert_events([C])
-            self.unwatch(wid, C)
-            C.bar = "baz"
-            self.assert_events([C])
-
-    def test_clear_watcher(self):
-        class C: pass
-        # outer watcher is unused, it's just to keep events list alive
-        with self.watcher() as _:
-            with self.watcher() as wid:
-                self.watch(wid, C)
-                C.foo = "bar"
-                self.assertEqual(C.foo, "bar")
-                self.assert_events([C])
-            C.bar = "baz"
-            # Watcher on C has been cleared, no new event
-            self.assert_events([C])
-
-    def test_watch_type_subclass(self):
-        class C: pass
-        class D(C): pass
-        with self.watcher() as wid:
-            self.watch(wid, D)
-            C.foo = "bar"
-            self.assert_events([D])
-
-    def test_error(self):
-        class C: pass
-        with self.watcher(kind=self.ERROR) as wid:
-            self.watch(wid, C)
-            with catch_unraisable_exception() as cm:
-                C.foo = "bar"
-                self.assertIs(cm.unraisable.object, C)
-                self.assertEqual(str(cm.unraisable.exc_value), "boom!")
-            self.assert_events([])
-
-    def test_two_watchers(self):
-        class C1: pass
-        class C2: pass
-        with self.watcher() as wid1:
-            with self.watcher(kind=self.WRAP) as wid2:
-                self.assertNotEqual(wid1, wid2)
-                self.watch(wid1, C1)
-                self.watch(wid2, C2)
-                C1.foo = "bar"
-                C2.hmm = "baz"
-                self.assert_events([C1, [C2]])
-
-    def test_watch_non_type(self):
-        with self.watcher() as wid:
-            with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
-                self.watch(wid, 1)
-
-    def test_watch_out_of_range_watcher_id(self):
-        class C: pass
-        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
-            self.watch(-1, C)
-        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
-            self.watch(self.TYPE_MAX_WATCHERS, C)
-
-    def test_watch_unassigned_watcher_id(self):
-        class C: pass
-        with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
-            self.watch(1, C)
-
-    def test_unwatch_non_type(self):
-        with self.watcher() as wid:
-            with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
-                self.unwatch(wid, 1)
-
-    def test_unwatch_out_of_range_watcher_id(self):
-        class C: pass
-        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
-            self.unwatch(-1, C)
-        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
-            self.unwatch(self.TYPE_MAX_WATCHERS, C)
-
-    def test_unwatch_unassigned_watcher_id(self):
-        class C: pass
-        with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
-            self.unwatch(1, C)
-
-    def test_clear_out_of_range_watcher_id(self):
-        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
-            self.clear_watcher(-1)
-        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
-            self.clear_watcher(self.TYPE_MAX_WATCHERS)
-
-    def test_clear_unassigned_watcher_id(self):
-        with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
-            self.clear_watcher(1)
-
-    def test_no_more_ids_available(self):
-        contexts = [self.watcher() for i in range(self.TYPE_MAX_WATCHERS)]
-        with ExitStack() as stack:
-            for ctx in contexts:
-                stack.enter_context(ctx)
-            with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"):
-                self.add_watcher()
-
-
 if __name__ == "__main__":
     unittest.main()
diff --git a/Lib/test/test_capi/test_watchers.py b/Lib/test/test_capi/test_watchers.py
new file mode 100644 (file)
index 0000000..f635c74
--- /dev/null
@@ -0,0 +1,340 @@
+import unittest
+
+from contextlib import contextmanager, ExitStack
+from test.support import catch_unraisable_exception, import_helper
+
+
+# Skip this test if the _testcapi module isn't available.
+_testcapi = import_helper.import_module('_testcapi')
+
+
+class TestDictWatchers(unittest.TestCase):
+    # types of watchers testcapimodule can add:
+    EVENTS = 0   # appends dict events as strings to global event list
+    ERROR = 1    # unconditionally sets and signals a RuntimeException
+    SECOND = 2   # always appends "second" to global event list
+
+    def add_watcher(self, kind=EVENTS):
+        return _testcapi.add_dict_watcher(kind)
+
+    def clear_watcher(self, watcher_id):
+        _testcapi.clear_dict_watcher(watcher_id)
+
+    @contextmanager
+    def watcher(self, kind=EVENTS):
+        wid = self.add_watcher(kind)
+        try:
+            yield wid
+        finally:
+            self.clear_watcher(wid)
+
+    def assert_events(self, expected):
+        actual = _testcapi.get_dict_watcher_events()
+        self.assertEqual(actual, expected)
+
+    def watch(self, wid, d):
+        _testcapi.watch_dict(wid, d)
+
+    def unwatch(self, wid, d):
+        _testcapi.unwatch_dict(wid, d)
+
+    def test_set_new_item(self):
+        d = {}
+        with self.watcher() as wid:
+            self.watch(wid, d)
+            d["foo"] = "bar"
+            self.assert_events(["new:foo:bar"])
+
+    def test_set_existing_item(self):
+        d = {"foo": "bar"}
+        with self.watcher() as wid:
+            self.watch(wid, d)
+            d["foo"] = "baz"
+            self.assert_events(["mod:foo:baz"])
+
+    def test_clone(self):
+        d = {}
+        d2 = {"foo": "bar"}
+        with self.watcher() as wid:
+            self.watch(wid, d)
+            d.update(d2)
+            self.assert_events(["clone"])
+
+    def test_no_event_if_not_watched(self):
+        d = {}
+        with self.watcher() as wid:
+            d["foo"] = "bar"
+            self.assert_events([])
+
+    def test_del(self):
+        d = {"foo": "bar"}
+        with self.watcher() as wid:
+            self.watch(wid, d)
+            del d["foo"]
+            self.assert_events(["del:foo"])
+
+    def test_pop(self):
+        d = {"foo": "bar"}
+        with self.watcher() as wid:
+            self.watch(wid, d)
+            d.pop("foo")
+            self.assert_events(["del:foo"])
+
+    def test_clear(self):
+        d = {"foo": "bar"}
+        with self.watcher() as wid:
+            self.watch(wid, d)
+            d.clear()
+            self.assert_events(["clear"])
+
+    def test_dealloc(self):
+        d = {"foo": "bar"}
+        with self.watcher() as wid:
+            self.watch(wid, d)
+            del d
+            self.assert_events(["dealloc"])
+
+    def test_unwatch(self):
+        d = {}
+        with self.watcher() as wid:
+            self.watch(wid, d)
+            d["foo"] = "bar"
+            self.unwatch(wid, d)
+            d["hmm"] = "baz"
+            self.assert_events(["new:foo:bar"])
+
+    def test_error(self):
+        d = {}
+        with self.watcher(kind=self.ERROR) as wid:
+            self.watch(wid, d)
+            with catch_unraisable_exception() as cm:
+                d["foo"] = "bar"
+                self.assertIs(cm.unraisable.object, d)
+                self.assertEqual(str(cm.unraisable.exc_value), "boom!")
+            self.assert_events([])
+
+    def test_two_watchers(self):
+        d1 = {}
+        d2 = {}
+        with self.watcher() as wid1:
+            with self.watcher(kind=self.SECOND) as wid2:
+                self.watch(wid1, d1)
+                self.watch(wid2, d2)
+                d1["foo"] = "bar"
+                d2["hmm"] = "baz"
+                self.assert_events(["new:foo:bar", "second"])
+
+    def test_watch_non_dict(self):
+        with self.watcher() as wid:
+            with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
+                self.watch(wid, 1)
+
+    def test_watch_out_of_range_watcher_id(self):
+        d = {}
+        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
+            self.watch(-1, d)
+        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
+            self.watch(8, d)  # DICT_MAX_WATCHERS = 8
+
+    def test_watch_unassigned_watcher_id(self):
+        d = {}
+        with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
+            self.watch(1, d)
+
+    def test_unwatch_non_dict(self):
+        with self.watcher() as wid:
+            with self.assertRaisesRegex(ValueError, r"Cannot watch non-dictionary"):
+                self.unwatch(wid, 1)
+
+    def test_unwatch_out_of_range_watcher_id(self):
+        d = {}
+        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
+            self.unwatch(-1, d)
+        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
+            self.unwatch(8, d)  # DICT_MAX_WATCHERS = 8
+
+    def test_unwatch_unassigned_watcher_id(self):
+        d = {}
+        with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
+            self.unwatch(1, d)
+
+    def test_clear_out_of_range_watcher_id(self):
+        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID -1"):
+            self.clear_watcher(-1)
+        with self.assertRaisesRegex(ValueError, r"Invalid dict watcher ID 8"):
+            self.clear_watcher(8)  # DICT_MAX_WATCHERS = 8
+
+    def test_clear_unassigned_watcher_id(self):
+        with self.assertRaisesRegex(ValueError, r"No dict watcher set for ID 1"):
+            self.clear_watcher(1)
+
+
+class TestTypeWatchers(unittest.TestCase):
+    # types of watchers testcapimodule can add:
+    TYPES = 0    # appends modified types to global event list
+    ERROR = 1    # unconditionally sets and signals a RuntimeException
+    WRAP = 2     # appends modified type wrapped in list to global event list
+
+    # duplicating the C constant
+    TYPE_MAX_WATCHERS = 8
+
+    def add_watcher(self, kind=TYPES):
+        return _testcapi.add_type_watcher(kind)
+
+    def clear_watcher(self, watcher_id):
+        _testcapi.clear_type_watcher(watcher_id)
+
+    @contextmanager
+    def watcher(self, kind=TYPES):
+        wid = self.add_watcher(kind)
+        try:
+            yield wid
+        finally:
+            self.clear_watcher(wid)
+
+    def assert_events(self, expected):
+        actual = _testcapi.get_type_modified_events()
+        self.assertEqual(actual, expected)
+
+    def watch(self, wid, t):
+        _testcapi.watch_type(wid, t)
+
+    def unwatch(self, wid, t):
+        _testcapi.unwatch_type(wid, t)
+
+    def test_watch_type(self):
+        class C: pass
+        with self.watcher() as wid:
+            self.watch(wid, C)
+            C.foo = "bar"
+            self.assert_events([C])
+
+    def test_event_aggregation(self):
+        class C: pass
+        with self.watcher() as wid:
+            self.watch(wid, C)
+            C.foo = "bar"
+            C.bar = "baz"
+            # only one event registered for both modifications
+            self.assert_events([C])
+
+    def test_lookup_resets_aggregation(self):
+        class C: pass
+        with self.watcher() as wid:
+            self.watch(wid, C)
+            C.foo = "bar"
+            # lookup resets type version tag
+            self.assertEqual(C.foo, "bar")
+            C.bar = "baz"
+            # both events registered
+            self.assert_events([C, C])
+
+    def test_unwatch_type(self):
+        class C: pass
+        with self.watcher() as wid:
+            self.watch(wid, C)
+            C.foo = "bar"
+            self.assertEqual(C.foo, "bar")
+            self.assert_events([C])
+            self.unwatch(wid, C)
+            C.bar = "baz"
+            self.assert_events([C])
+
+    def test_clear_watcher(self):
+        class C: pass
+        # outer watcher is unused, it's just to keep events list alive
+        with self.watcher() as _:
+            with self.watcher() as wid:
+                self.watch(wid, C)
+                C.foo = "bar"
+                self.assertEqual(C.foo, "bar")
+                self.assert_events([C])
+            C.bar = "baz"
+            # Watcher on C has been cleared, no new event
+            self.assert_events([C])
+
+    def test_watch_type_subclass(self):
+        class C: pass
+        class D(C): pass
+        with self.watcher() as wid:
+            self.watch(wid, D)
+            C.foo = "bar"
+            self.assert_events([D])
+
+    def test_error(self):
+        class C: pass
+        with self.watcher(kind=self.ERROR) as wid:
+            self.watch(wid, C)
+            with catch_unraisable_exception() as cm:
+                C.foo = "bar"
+                self.assertIs(cm.unraisable.object, C)
+                self.assertEqual(str(cm.unraisable.exc_value), "boom!")
+            self.assert_events([])
+
+    def test_two_watchers(self):
+        class C1: pass
+        class C2: pass
+        with self.watcher() as wid1:
+            with self.watcher(kind=self.WRAP) as wid2:
+                self.assertNotEqual(wid1, wid2)
+                self.watch(wid1, C1)
+                self.watch(wid2, C2)
+                C1.foo = "bar"
+                C2.hmm = "baz"
+                self.assert_events([C1, [C2]])
+
+    def test_watch_non_type(self):
+        with self.watcher() as wid:
+            with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
+                self.watch(wid, 1)
+
+    def test_watch_out_of_range_watcher_id(self):
+        class C: pass
+        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
+            self.watch(-1, C)
+        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
+            self.watch(self.TYPE_MAX_WATCHERS, C)
+
+    def test_watch_unassigned_watcher_id(self):
+        class C: pass
+        with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
+            self.watch(1, C)
+
+    def test_unwatch_non_type(self):
+        with self.watcher() as wid:
+            with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"):
+                self.unwatch(wid, 1)
+
+    def test_unwatch_out_of_range_watcher_id(self):
+        class C: pass
+        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
+            self.unwatch(-1, C)
+        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
+            self.unwatch(self.TYPE_MAX_WATCHERS, C)
+
+    def test_unwatch_unassigned_watcher_id(self):
+        class C: pass
+        with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
+            self.unwatch(1, C)
+
+    def test_clear_out_of_range_watcher_id(self):
+        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"):
+            self.clear_watcher(-1)
+        with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"):
+            self.clear_watcher(self.TYPE_MAX_WATCHERS)
+
+    def test_clear_unassigned_watcher_id(self):
+        with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"):
+            self.clear_watcher(1)
+
+    def test_no_more_ids_available(self):
+        contexts = [self.watcher() for i in range(self.TYPE_MAX_WATCHERS)]
+        with ExitStack() as stack:
+            for ctx in contexts:
+                stack.enter_context(ctx)
+            with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"):
+                self.add_watcher()
+
+
+if __name__ == "__main__":
+    unittest.main()
index 26e7ffcdc85a1853d19dbe0c1db38c08667b7876..c033dbc4960ca0e64b67bd026d593c17e6010028 100644 (file)
 @MODULE__XXTESTFUZZ_TRUE@_xxtestfuzz _xxtestfuzz/_xxtestfuzz.c _xxtestfuzz/fuzzer.c
 @MODULE__TESTBUFFER_TRUE@_testbuffer _testbuffer.c
 @MODULE__TESTINTERNALCAPI_TRUE@_testinternalcapi _testinternalcapi.c
-@MODULE__TESTCAPI_TRUE@_testcapi _testcapimodule.c _testcapi/vectorcall.c _testcapi/vectorcall_limited.c _testcapi/heaptype.c _testcapi/unicode.c _testcapi/getargs.c _testcapi/pytime.c _testcapi/datetime.c _testcapi/docstring.c _testcapi/mem.c
+@MODULE__TESTCAPI_TRUE@_testcapi _testcapimodule.c _testcapi/vectorcall.c _testcapi/vectorcall_limited.c _testcapi/heaptype.c _testcapi/unicode.c _testcapi/getargs.c _testcapi/pytime.c _testcapi/datetime.c _testcapi/docstring.c _testcapi/mem.c _testcapi/watchers.c
 
 # Some testing modules MUST be built as shared libraries.
 *shared*
index a39007bad6dd3b138bb76595bfcef874cd99cb73..2bc2a0e612573276732852f9db688b473e41f559 100644 (file)
@@ -32,6 +32,7 @@ int _PyTestCapi_Init_PyTime(PyObject *module);
 int _PyTestCapi_Init_DateTime(PyObject *module);
 int _PyTestCapi_Init_Docstring(PyObject *module);
 int _PyTestCapi_Init_Mem(PyObject *module);
+int _PyTestCapi_Init_Watchers(PyObject *module);
 
 #ifdef LIMITED_API_AVAILABLE
 int _PyTestCapi_Init_VectorcallLimited(PyObject *module);
diff --git a/Modules/_testcapi/watchers.c b/Modules/_testcapi/watchers.c
new file mode 100644 (file)
index 0000000..e0d489a
--- /dev/null
@@ -0,0 +1,302 @@
+#include "parts.h"
+
+
+// Test dict watching
+static PyObject *g_dict_watch_events;
+static int g_dict_watchers_installed;
+
+static int
+dict_watch_callback(PyDict_WatchEvent event,
+                    PyObject *dict,
+                    PyObject *key,
+                    PyObject *new_value)
+{
+    PyObject *msg;
+    switch (event) {
+        case PyDict_EVENT_CLEARED:
+            msg = PyUnicode_FromString("clear");
+            break;
+        case PyDict_EVENT_DEALLOCATED:
+            msg = PyUnicode_FromString("dealloc");
+            break;
+        case PyDict_EVENT_CLONED:
+            msg = PyUnicode_FromString("clone");
+            break;
+        case PyDict_EVENT_ADDED:
+            msg = PyUnicode_FromFormat("new:%S:%S", key, new_value);
+            break;
+        case PyDict_EVENT_MODIFIED:
+            msg = PyUnicode_FromFormat("mod:%S:%S", key, new_value);
+            break;
+        case PyDict_EVENT_DELETED:
+            msg = PyUnicode_FromFormat("del:%S", key);
+            break;
+        default:
+            msg = PyUnicode_FromString("unknown");
+    }
+    if (msg == NULL) {
+        return -1;
+    }
+    assert(PyList_Check(g_dict_watch_events));
+    if (PyList_Append(g_dict_watch_events, msg) < 0) {
+        Py_DECREF(msg);
+        return -1;
+    }
+    Py_DECREF(msg);
+    return 0;
+}
+
+static int
+dict_watch_callback_second(PyDict_WatchEvent event,
+                           PyObject *dict,
+                           PyObject *key,
+                           PyObject *new_value)
+{
+    PyObject *msg = PyUnicode_FromString("second");
+    if (msg == NULL) {
+        return -1;
+    }
+    int rc = PyList_Append(g_dict_watch_events, msg);
+    Py_DECREF(msg);
+    if (rc < 0) {
+        return -1;
+    }
+    return 0;
+}
+
+static int
+dict_watch_callback_error(PyDict_WatchEvent event,
+                          PyObject *dict,
+                          PyObject *key,
+                          PyObject *new_value)
+{
+    PyErr_SetString(PyExc_RuntimeError, "boom!");
+    return -1;
+}
+
+static PyObject *
+add_dict_watcher(PyObject *self, PyObject *kind)
+{
+    int watcher_id;
+    assert(PyLong_Check(kind));
+    long kind_l = PyLong_AsLong(kind);
+    if (kind_l == 2) {
+        watcher_id = PyDict_AddWatcher(dict_watch_callback_second);
+    }
+    else if (kind_l == 1) {
+        watcher_id = PyDict_AddWatcher(dict_watch_callback_error);
+    }
+    else {
+        watcher_id = PyDict_AddWatcher(dict_watch_callback);
+    }
+    if (watcher_id < 0) {
+        return NULL;
+    }
+    if (!g_dict_watchers_installed) {
+        assert(!g_dict_watch_events);
+        if (!(g_dict_watch_events = PyList_New(0))) {
+            return NULL;
+        }
+    }
+    g_dict_watchers_installed++;
+    return PyLong_FromLong(watcher_id);
+}
+
+static PyObject *
+clear_dict_watcher(PyObject *self, PyObject *watcher_id)
+{
+    if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
+        return NULL;
+    }
+    g_dict_watchers_installed--;
+    if (!g_dict_watchers_installed) {
+        assert(g_dict_watch_events);
+        Py_CLEAR(g_dict_watch_events);
+    }
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+watch_dict(PyObject *self, PyObject *args)
+{
+    PyObject *dict;
+    int watcher_id;
+    if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
+        return NULL;
+    }
+    if (PyDict_Watch(watcher_id, dict)) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+unwatch_dict(PyObject *self, PyObject *args)
+{
+    PyObject *dict;
+    int watcher_id;
+    if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
+        return NULL;
+    }
+    if (PyDict_Unwatch(watcher_id, dict)) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
+{
+    if (!g_dict_watch_events) {
+        PyErr_SetString(PyExc_RuntimeError, "no watchers active");
+        return NULL;
+    }
+    return Py_NewRef(g_dict_watch_events);
+}
+
+// Test type watchers
+static PyObject *g_type_modified_events;
+static int g_type_watchers_installed;
+
+static int
+type_modified_callback(PyTypeObject *type)
+{
+    assert(PyList_Check(g_type_modified_events));
+    if(PyList_Append(g_type_modified_events, (PyObject *)type) < 0) {
+        return -1;
+    }
+    return 0;
+}
+
+static int
+type_modified_callback_wrap(PyTypeObject *type)
+{
+    assert(PyList_Check(g_type_modified_events));
+    PyObject *list = PyList_New(0);
+    if (list == NULL) {
+        return -1;
+    }
+    if (PyList_Append(list, (PyObject *)type) < 0) {
+        Py_DECREF(list);
+        return -1;
+    }
+    if (PyList_Append(g_type_modified_events, list) < 0) {
+        Py_DECREF(list);
+        return -1;
+    }
+    Py_DECREF(list);
+    return 0;
+}
+
+static int
+type_modified_callback_error(PyTypeObject *type)
+{
+    PyErr_SetString(PyExc_RuntimeError, "boom!");
+    return -1;
+}
+
+static PyObject *
+add_type_watcher(PyObject *self, PyObject *kind)
+{
+    int watcher_id;
+    assert(PyLong_Check(kind));
+    long kind_l = PyLong_AsLong(kind);
+    if (kind_l == 2) {
+        watcher_id = PyType_AddWatcher(type_modified_callback_wrap);
+    }
+    else if (kind_l == 1) {
+        watcher_id = PyType_AddWatcher(type_modified_callback_error);
+    }
+    else {
+        watcher_id = PyType_AddWatcher(type_modified_callback);
+    }
+    if (watcher_id < 0) {
+        return NULL;
+    }
+    if (!g_type_watchers_installed) {
+        assert(!g_type_modified_events);
+        if (!(g_type_modified_events = PyList_New(0))) {
+            return NULL;
+        }
+    }
+    g_type_watchers_installed++;
+    return PyLong_FromLong(watcher_id);
+}
+
+static PyObject *
+clear_type_watcher(PyObject *self, PyObject *watcher_id)
+{
+    if (PyType_ClearWatcher(PyLong_AsLong(watcher_id))) {
+        return NULL;
+    }
+    g_type_watchers_installed--;
+    if (!g_type_watchers_installed) {
+        assert(g_type_modified_events);
+        Py_CLEAR(g_type_modified_events);
+    }
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+get_type_modified_events(PyObject *self, PyObject *Py_UNUSED(args))
+{
+    if (!g_type_modified_events) {
+        PyErr_SetString(PyExc_RuntimeError, "no watchers active");
+        return NULL;
+    }
+    return Py_NewRef(g_type_modified_events);
+}
+
+static PyObject *
+watch_type(PyObject *self, PyObject *args)
+{
+    PyObject *type;
+    int watcher_id;
+    if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
+        return NULL;
+    }
+    if (PyType_Watch(watcher_id, type)) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+static PyObject *
+unwatch_type(PyObject *self, PyObject *args)
+{
+    PyObject *type;
+    int watcher_id;
+    if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
+        return NULL;
+    }
+    if (PyType_Unwatch(watcher_id, type)) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+static PyMethodDef test_methods[] = {
+    // Dict watchers.
+    {"add_dict_watcher",         add_dict_watcher,        METH_O,       NULL},
+    {"clear_dict_watcher",       clear_dict_watcher,      METH_O,       NULL},
+    {"watch_dict",               watch_dict,              METH_VARARGS, NULL},
+    {"unwatch_dict",             unwatch_dict,            METH_VARARGS, NULL},
+    {"get_dict_watcher_events",  get_dict_watcher_events, METH_NOARGS,  NULL},
+
+    // Type watchers.
+    {"add_type_watcher",         add_type_watcher,        METH_O,       NULL},
+    {"clear_type_watcher",       clear_type_watcher,      METH_O,       NULL},
+    {"watch_type",               watch_type,              METH_VARARGS, NULL},
+    {"unwatch_type",             unwatch_type,            METH_VARARGS, NULL},
+    {"get_type_modified_events", get_type_modified_events, METH_NOARGS, NULL},
+    {NULL},
+};
+
+int
+_PyTestCapi_Init_Watchers(PyObject *mod)
+{
+    if (PyModule_AddFunctions(mod, test_methods) < 0) {
+        return -1;
+    }
+    return 0;
+}
index 2c21782dde16c8506c4fa15a0073826f68a465e6..01b39233c537e70961a5551916759de8378f02e4 100644 (file)
@@ -3390,159 +3390,6 @@ test_tstate_capi(PyObject *self, PyObject *Py_UNUSED(args))
     Py_RETURN_NONE;
 }
 
-
-// Test dict watching
-static PyObject *g_dict_watch_events;
-static int g_dict_watchers_installed;
-
-static int
-dict_watch_callback(PyDict_WatchEvent event,
-                    PyObject *dict,
-                    PyObject *key,
-                    PyObject *new_value)
-{
-    PyObject *msg;
-    switch(event) {
-        case PyDict_EVENT_CLEARED:
-            msg = PyUnicode_FromString("clear");
-            break;
-        case PyDict_EVENT_DEALLOCATED:
-            msg = PyUnicode_FromString("dealloc");
-            break;
-        case PyDict_EVENT_CLONED:
-            msg = PyUnicode_FromString("clone");
-            break;
-        case PyDict_EVENT_ADDED:
-            msg = PyUnicode_FromFormat("new:%S:%S", key, new_value);
-            break;
-        case PyDict_EVENT_MODIFIED:
-            msg = PyUnicode_FromFormat("mod:%S:%S", key, new_value);
-            break;
-        case PyDict_EVENT_DELETED:
-            msg = PyUnicode_FromFormat("del:%S", key);
-            break;
-        default:
-            msg = PyUnicode_FromString("unknown");
-    }
-    if (!msg) {
-        return -1;
-    }
-    assert(PyList_Check(g_dict_watch_events));
-    if (PyList_Append(g_dict_watch_events, msg) < 0) {
-        Py_DECREF(msg);
-        return -1;
-    }
-    Py_DECREF(msg);
-    return 0;
-}
-
-static int
-dict_watch_callback_second(PyDict_WatchEvent event,
-                           PyObject *dict,
-                           PyObject *key,
-                           PyObject *new_value)
-{
-    PyObject *msg = PyUnicode_FromString("second");
-    if (!msg) {
-        return -1;
-    }
-    if (PyList_Append(g_dict_watch_events, msg) < 0) {
-        Py_DECREF(msg);
-        return -1;
-    }
-    Py_DECREF(msg);
-    return 0;
-}
-
-static int
-dict_watch_callback_error(PyDict_WatchEvent event,
-                          PyObject *dict,
-                          PyObject *key,
-                          PyObject *new_value)
-{
-    PyErr_SetString(PyExc_RuntimeError, "boom!");
-    return -1;
-}
-
-static PyObject *
-add_dict_watcher(PyObject *self, PyObject *kind)
-{
-    int watcher_id;
-    assert(PyLong_Check(kind));
-    long kind_l = PyLong_AsLong(kind);
-    if (kind_l == 2) {
-        watcher_id = PyDict_AddWatcher(dict_watch_callback_second);
-    } else if (kind_l == 1) {
-        watcher_id = PyDict_AddWatcher(dict_watch_callback_error);
-    } else {
-        watcher_id = PyDict_AddWatcher(dict_watch_callback);
-    }
-    if (watcher_id < 0) {
-        return NULL;
-    }
-    if (!g_dict_watchers_installed) {
-        assert(!g_dict_watch_events);
-        if (!(g_dict_watch_events = PyList_New(0))) {
-            return NULL;
-        }
-    }
-    g_dict_watchers_installed++;
-    return PyLong_FromLong(watcher_id);
-}
-
-static PyObject *
-clear_dict_watcher(PyObject *self, PyObject *watcher_id)
-{
-    if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
-        return NULL;
-    }
-    g_dict_watchers_installed--;
-    if (!g_dict_watchers_installed) {
-        assert(g_dict_watch_events);
-        Py_CLEAR(g_dict_watch_events);
-    }
-    Py_RETURN_NONE;
-}
-
-static PyObject *
-watch_dict(PyObject *self, PyObject *args)
-{
-    PyObject *dict;
-    int watcher_id;
-    if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
-        return NULL;
-    }
-    if (PyDict_Watch(watcher_id, dict)) {
-        return NULL;
-    }
-    Py_RETURN_NONE;
-}
-
-static PyObject *
-unwatch_dict(PyObject *self, PyObject *args)
-{
-    PyObject *dict;
-    int watcher_id;
-    if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
-        return NULL;
-    }
-    if (PyDict_Unwatch(watcher_id, dict)) {
-        return NULL;
-    }
-    Py_RETURN_NONE;
-}
-
-static PyObject *
-get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
-{
-    if (!g_dict_watch_events) {
-        PyErr_SetString(PyExc_RuntimeError, "no watchers active");
-        return NULL;
-    }
-    return Py_NewRef(g_dict_watch_events);
-}
-
-
 // Test PyFloat_Pack2(), PyFloat_Pack4() and PyFloat_Pack8()
 static PyObject *
 test_float_pack(PyObject *self, PyObject *args)
@@ -3988,128 +3835,6 @@ function_set_kw_defaults(PyObject *self, PyObject *args)
     Py_RETURN_NONE;
 }
 
-
-// type watchers
-
-static PyObject *g_type_modified_events;
-static int g_type_watchers_installed;
-
-static int
-type_modified_callback(PyTypeObject *type)
-{
-    assert(PyList_Check(g_type_modified_events));
-    if(PyList_Append(g_type_modified_events, (PyObject *)type) < 0) {
-        return -1;
-    }
-    return 0;
-}
-
-static int
-type_modified_callback_wrap(PyTypeObject *type)
-{
-    assert(PyList_Check(g_type_modified_events));
-    PyObject *list = PyList_New(0);
-    if (!list) {
-        return -1;
-    }
-    if (PyList_Append(list, (PyObject *)type) < 0) {
-        Py_DECREF(list);
-        return -1;
-    }
-    if (PyList_Append(g_type_modified_events, list) < 0) {
-        Py_DECREF(list);
-        return -1;
-    }
-    Py_DECREF(list);
-    return 0;
-}
-
-static int
-type_modified_callback_error(PyTypeObject *type)
-{
-    PyErr_SetString(PyExc_RuntimeError, "boom!");
-    return -1;
-}
-
-static PyObject *
-add_type_watcher(PyObject *self, PyObject *kind)
-{
-    int watcher_id;
-    assert(PyLong_Check(kind));
-    long kind_l = PyLong_AsLong(kind);
-    if (kind_l == 2) {
-        watcher_id = PyType_AddWatcher(type_modified_callback_wrap);
-    } else if (kind_l == 1) {
-        watcher_id = PyType_AddWatcher(type_modified_callback_error);
-    } else {
-        watcher_id = PyType_AddWatcher(type_modified_callback);
-    }
-    if (watcher_id < 0) {
-        return NULL;
-    }
-    if (!g_type_watchers_installed) {
-        assert(!g_type_modified_events);
-        if (!(g_type_modified_events = PyList_New(0))) {
-            return NULL;
-        }
-    }
-    g_type_watchers_installed++;
-    return PyLong_FromLong(watcher_id);
-}
-
-static PyObject *
-clear_type_watcher(PyObject *self, PyObject *watcher_id)
-{
-    if (PyType_ClearWatcher(PyLong_AsLong(watcher_id))) {
-        return NULL;
-    }
-    g_type_watchers_installed--;
-    if (!g_type_watchers_installed) {
-        assert(g_type_modified_events);
-        Py_CLEAR(g_type_modified_events);
-    }
-    Py_RETURN_NONE;
-}
-
-static PyObject *
-get_type_modified_events(PyObject *self, PyObject *Py_UNUSED(args))
-{
-    if (!g_type_modified_events) {
-        PyErr_SetString(PyExc_RuntimeError, "no watchers active");
-        return NULL;
-    }
-    return Py_NewRef(g_type_modified_events);
-}
-
-static PyObject *
-watch_type(PyObject *self, PyObject *args)
-{
-    PyObject *type;
-    int watcher_id;
-    if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
-        return NULL;
-    }
-    if (PyType_Watch(watcher_id, type)) {
-        return NULL;
-    }
-    Py_RETURN_NONE;
-}
-
-static PyObject *
-unwatch_type(PyObject *self, PyObject *args)
-{
-    PyObject *type;
-    int watcher_id;
-    if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
-        return NULL;
-    }
-    if (PyType_Unwatch(watcher_id, type)) {
-        return NULL;
-    }
-    Py_RETURN_NONE;
-}
-
-
 static PyObject *test_buildvalue_issue38913(PyObject *, PyObject *);
 
 static PyMethodDef TestMethods[] = {
@@ -4259,11 +3984,6 @@ static PyMethodDef TestMethods[] = {
     {"settrace_to_record", settrace_to_record, METH_O, NULL},
     {"test_macros", test_macros, METH_NOARGS, NULL},
     {"clear_managed_dict", clear_managed_dict, METH_O, NULL},
-    {"add_dict_watcher", add_dict_watcher, METH_O, NULL},
-    {"clear_dict_watcher", clear_dict_watcher, METH_O, NULL},
-    {"watch_dict", watch_dict, METH_VARARGS, NULL},
-    {"unwatch_dict", unwatch_dict, METH_VARARGS, NULL},
-    {"get_dict_watcher_events", get_dict_watcher_events, METH_NOARGS, NULL},
     {"function_get_code", function_get_code, METH_O, NULL},
     {"function_get_globals", function_get_globals, METH_O, NULL},
     {"function_get_module", function_get_module, METH_O, NULL},
@@ -4271,11 +3991,6 @@ static PyMethodDef TestMethods[] = {
     {"function_set_defaults", function_set_defaults, METH_VARARGS, NULL},
     {"function_get_kw_defaults", function_get_kw_defaults, METH_O, NULL},
     {"function_set_kw_defaults", function_set_kw_defaults, METH_VARARGS, NULL},
-    {"add_type_watcher", add_type_watcher, METH_O, NULL},
-    {"clear_type_watcher", clear_type_watcher, METH_O, NULL},
-    {"watch_type", watch_type, METH_VARARGS, NULL},
-    {"unwatch_type", unwatch_type, METH_VARARGS, NULL},
-    {"get_type_modified_events", get_type_modified_events, METH_NOARGS, NULL},
     {NULL, NULL} /* sentinel */
 };
 
@@ -5096,6 +4811,9 @@ PyInit__testcapi(void)
     if (_PyTestCapi_Init_Mem(m) < 0) {
         return NULL;
     }
+    if (_PyTestCapi_Init_Watchers(m) < 0) {
+        return NULL;
+    }
 
 #ifndef LIMITED_API_AVAILABLE
     PyModule_AddObjectRef(m, "LIMITED_API_AVAILABLE", Py_False);
index 0151d85a27fe8deae2dad01729382aae56026b79..42e7d30ac8816c5f8a27e2b19f09fc16bbbaef99 100644 (file)
     <ClCompile Include="..\Modules\_testcapi\datetime.c" />
     <ClCompile Include="..\Modules\_testcapi\docstring.c" />
     <ClCompile Include="..\Modules\_testcapi\mem.c" />
+    <ClCompile Include="..\Modules\_testcapi\watchers.c" />
   </ItemGroup>
   <ItemGroup>
     <ResourceCompile Include="..\PC\python_nt.rc" />
index c30c41bf5ee48a39ada49045ce767eb2a74f22dd..75652c383da3413145aea93734a05f887dd4ca22 100644 (file)
@@ -39,6 +39,9 @@
     <ClCompile Include="..\Modules\_testcapi\mem.c">
       <Filter>Source Files</Filter>
     </ClCompile>
+    <ClCompile Include="..\Modules\_testcapi\watchers.c">
+      <Filter>Source Files</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ResourceCompile Include="..\PC\python_nt.rc">