]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-108294: Add error handling for time.sleep audit event (GH-108363)
authorPetr Viktorin <encukou@gmail.com>
Tue, 5 Sep 2023 08:25:08 +0000 (10:25 +0200)
committerGitHub <noreply@github.com>
Tue, 5 Sep 2023 08:25:08 +0000 (10:25 +0200)
I've also cleaned the tests up a bit to make this easier to test.

Lib/test/audit-tests.py
Lib/test/test_audit.py
Modules/timemodule.c

index f14c2635d39390074ac4bea40001cc5eb9d12e8c..f0cedde308d53b83b5577bb6dd56dc33c17a5be9 100644 (file)
@@ -186,7 +186,7 @@ def test_monkeypatch():
     )
 
 
-def test_open():
+def test_open(testfn):
     # SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open()
     try:
         import ssl
@@ -199,11 +199,11 @@ def test_open():
     # All of them should fail
     with TestHook(raise_on_events={"open"}) as hook:
         for fn, *args in [
-            (open, sys.argv[2], "r"),
+            (open, testfn, "r"),
             (open, sys.executable, "rb"),
             (open, 3, "wb"),
-            (open, sys.argv[2], "w", -1, None, None, None, False, lambda *a: 1),
-            (load_dh_params, sys.argv[2]),
+            (open, testfn, "w", -1, None, None, None, False, lambda *a: 1),
+            (load_dh_params, testfn),
         ]:
             if not fn:
                 continue
@@ -216,11 +216,11 @@ def test_open():
         [
             i
             for i in [
-                (sys.argv[2], "r"),
+                (testfn, "r"),
                 (sys.executable, "r"),
                 (3, "w"),
-                (sys.argv[2], "w"),
-                (sys.argv[2], "rb") if load_dh_params else None,
+                (testfn, "w"),
+                (testfn, "rb") if load_dh_params else None,
             ]
             if i is not None
         ],
@@ -517,12 +517,15 @@ def test_not_in_gc():
             assert hook not in o
 
 
-def test_time():
+def test_time(mode):
     import time
 
     def hook(event, args):
         if event.startswith("time."):
-            print(event, *args)
+            if mode == 'print':
+                print(event, *args)
+            elif mode == 'fail':
+                raise AssertionError('hook failed')
     sys.addaudithook(hook)
 
     time.sleep(0)
@@ -549,4 +552,4 @@ if __name__ == "__main__":
     suppress_msvcrt_asserts()
 
     test = sys.argv[1]
-    globals()[test]()
+    globals()[test](*sys.argv[2:])
index 3a15835917cc327ce127cb683ad760acd2f8f416..47e5832d311bd1c391837fbf6254125c4dcc4318 100644 (file)
@@ -19,7 +19,7 @@ class AuditTest(unittest.TestCase):
     maxDiff = None
 
     @support.requires_subprocess()
-    def do_test(self, *args):
+    def run_test_in_subprocess(self, *args):
         with subprocess.Popen(
             [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
             encoding="utf-8",
@@ -27,27 +27,26 @@ class AuditTest(unittest.TestCase):
             stderr=subprocess.PIPE,
         ) as p:
             p.wait()
-            sys.stdout.writelines(p.stdout)
-            sys.stderr.writelines(p.stderr)
-            if p.returncode:
-                self.fail("".join(p.stderr))
+            return p, p.stdout.read(), p.stderr.read()
 
-    @support.requires_subprocess()
-    def run_python(self, *args):
+    def do_test(self, *args):
+        proc, stdout, stderr = self.run_test_in_subprocess(*args)
+
+        sys.stdout.write(stdout)
+        sys.stderr.write(stderr)
+        if proc.returncode:
+            self.fail(stderr)
+
+    def run_python(self, *args, expect_stderr=False):
         events = []
-        with subprocess.Popen(
-            [sys.executable, "-X utf8", AUDIT_TESTS_PY, *args],
-            encoding="utf-8",
-            stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
-        ) as p:
-            p.wait()
-            sys.stderr.writelines(p.stderr)
-            return (
-                p.returncode,
-                [line.strip().partition(" ") for line in p.stdout],
-                "".join(p.stderr),
-            )
+        proc, stdout, stderr = self.run_test_in_subprocess(*args)
+        if not expect_stderr or support.verbose:
+            sys.stderr.write(stderr)
+        return (
+            proc.returncode,
+            [line.strip().partition(" ") for line in stdout.splitlines()],
+            stderr,
+        )
 
     def test_basic(self):
         self.do_test("test_basic")
@@ -257,7 +256,7 @@ class AuditTest(unittest.TestCase):
             self.fail(stderr)
 
     def test_time(self):
-        returncode, events, stderr = self.run_python("test_time")
+        returncode, events, stderr = self.run_python("test_time", "print")
         if returncode:
             self.fail(stderr)
 
@@ -271,6 +270,11 @@ class AuditTest(unittest.TestCase):
 
         self.assertEqual(actual, expected)
 
+    def test_time_fail(self):
+        returncode, events, stderr = self.run_python("test_time", "fail",
+                                                     expect_stderr=True)
+        self.assertNotEqual(returncode, 0)
+        self.assertIn('hook failed', stderr.splitlines()[-1])
 
     def test_sys_monitoring_register_callback(self):
         returncode, events, stderr = self.run_python("test_sys_monitoring_register_callback")
index a2b66520ee885d3b9a5f840cee6c2d2dae1f3b01..6a872a285d289b8c19888331b6e32643a00a8644 100644 (file)
@@ -413,7 +413,9 @@ Return the clk_id of a thread's CPU time clock.");
 static PyObject *
 time_sleep(PyObject *self, PyObject *timeout_obj)
 {
-    PySys_Audit("time.sleep", "O", timeout_obj);
+    if (PySys_Audit("time.sleep", "O", timeout_obj) < 0) {
+        return NULL;
+    }
 
     _PyTime_t timeout;
     if (_PyTime_FromSecondsObject(&timeout, timeout_obj, _PyTime_ROUND_TIMEOUT))