]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
[3.13] gh-142206: multiprocessing.resource_tracker: Decode messages using older proto...
authorMiss Islington (bot) <31488909+miss-islington@users.noreply.github.com>
Fri, 5 Dec 2025 12:13:05 +0000 (13:13 +0100)
committerGitHub <noreply@github.com>
Fri, 5 Dec 2025 12:13:05 +0000 (12:13 +0000)
[3.14] gh-142206: multiprocessing.resource_tracker: Decode messages using older protocol (GH-142215) (GH-142285)

(cherry picked from commit 4172644d78d58189e46424af0aea302b1d78e2de)

Difference from the original commit: the default in 3.14 is to use
the simpler original protocol (except for filenames with newlines).
(cherry picked from commit f130b06da30616dd4dae363c7d8af79b540a7971)

Co-authored-by: Petr Viktorin <encukou@gmail.com>
Lib/multiprocessing/resource_tracker.py
Lib/test/_test_multiprocessing.py
Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst [new file with mode: 0644]

index a34e38d19d44993cc73497b795745ab19bc04bc4..22e3bbcf21b0172d20b134a3599af34b0c68a3f5 100644 (file)
@@ -72,6 +72,13 @@ class ResourceTracker(object):
         self._exitcode = None
         self._reentrant_messages = deque()
 
+        # True to use colon-separated lines, rather than JSON lines,
+        # for internal communication. (Mainly for testing).
+        # Filenames not supported by the simple format will always be sent
+        # using JSON.
+        # The reader should understand all formats.
+        self._use_simple_format = True
+
     def _reentrant_call_error(self):
         # gh-109629: this happens if an explicit call to the ResourceTracker
         # gets interrupted by a garbage collection, invoking a finalizer (*)
@@ -204,7 +211,9 @@ class ResourceTracker(object):
             os.close(r)
 
     def _make_probe_message(self):
-        """Return a JSON-encoded probe message."""
+        """Return a probe message."""
+        if self._use_simple_format:
+            return b'PROBE:0:noop\n'
         return (
             json.dumps(
                 {"cmd": "PROBE", "rtype": "noop"},
@@ -271,6 +280,15 @@ class ResourceTracker(object):
         assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
 
     def _send(self, cmd, name, rtype):
+        if self._use_simple_format and '\n' not in name:
+            msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
+            if len(msg) > 512:
+                # posix guarantees that writes to a pipe of less than PIPE_BUF
+                # bytes are atomic, and that PIPE_BUF >= 512
+                raise ValueError('msg too long')
+            self._ensure_running_and_write(msg)
+            return
+
         # POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)
         # bytes are atomic. Therefore, we want the message to be shorter than 512 bytes.
         # POSIX shm_open() and sem_open() require the name, including its leading slash,
@@ -290,6 +308,7 @@ class ResourceTracker(object):
 
         # The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction.
         assert len(msg) <= 512, f"internal error: message too long ({len(msg)} bytes)"
+        assert msg.startswith(b'{')
 
         self._ensure_running_and_write(msg)
 
@@ -300,6 +319,30 @@ unregister = _resource_tracker.unregister
 getfd = _resource_tracker.getfd
 
 
+def _decode_message(line):
+    if line.startswith(b'{'):
+        try:
+            obj = json.loads(line.decode('ascii'))
+        except Exception as e:
+            raise ValueError("malformed resource_tracker message: %r" % (line,)) from e
+
+        cmd = obj["cmd"]
+        rtype = obj["rtype"]
+        b64  = obj.get("base64_name", "")
+
+        if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
+            raise ValueError("malformed resource_tracker fields: %r" % (obj,))
+
+        try:
+            name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
+        except ValueError as e:
+            raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
+    else:
+        cmd, rest = line.strip().decode('ascii').split(':', maxsplit=1)
+        name, rtype = rest.rsplit(':', maxsplit=1)
+    return cmd, rtype, name
+
+
 def main(fd):
     '''Run resource tracker.'''
     # protect the process from ^C and "killall python" etc
@@ -322,23 +365,7 @@ def main(fd):
         with open(fd, 'rb') as f:
             for line in f:
                 try:
-                    try:
-                        obj = json.loads(line.decode('ascii'))
-                    except Exception as e:
-                        raise ValueError("malformed resource_tracker message: %r" % (line,)) from e
-
-                    cmd = obj["cmd"]
-                    rtype = obj["rtype"]
-                    b64  = obj.get("base64_name", "")
-
-                    if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
-                        raise ValueError("malformed resource_tracker fields: %r" % (obj,))
-
-                    try:
-                        name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
-                    except ValueError as e:
-                        raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
-
+                    cmd, rtype, name = _decode_message(line)
                     cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
                     if cleanup_func is None:
                         raise ValueError(
index 8e7cac42568f997bf4a735333b89554974d0bf33..7b46e4297c15035d2a44e21dd992308a3a8dfc06 100644 (file)
@@ -38,7 +38,7 @@ from test.support import script_helper
 from test.support import socket_helper
 from test.support import threading_helper
 from test.support import warnings_helper
-
+from test.support import subTests
 
 # Skip tests if _multiprocessing wasn't built.
 _multiprocessing = import_helper.import_module('_multiprocessing')
@@ -4141,6 +4141,19 @@ class _TestSharedCTypes(BaseTestCase):
         self.assertEqual(bar.z, 2 ** 33)
 
 
+def resource_tracker_format_subtests(func):
+    """Run given test using both resource tracker communication formats"""
+    def _inner(self, *args, **kwargs):
+        tracker = resource_tracker._resource_tracker
+        for use_simple_format in False, True:
+            with (
+                self.subTest(use_simple_format=use_simple_format),
+                unittest.mock.patch.object(
+                    tracker, '_use_simple_format', use_simple_format)
+            ):
+                func(self, *args, **kwargs)
+    return _inner
+
 @unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
 @hashlib_helper.requires_hashdigest('sha256')
 class _TestSharedMemory(BaseTestCase):
@@ -4418,6 +4431,7 @@ class _TestSharedMemory(BaseTestCase):
         smm.shutdown()
 
     @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
+    @resource_tracker_format_subtests
     def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
         # bpo-36867: test that a SharedMemoryManager uses the
         # same resource_tracker process as its parent.
@@ -4668,6 +4682,7 @@ class _TestSharedMemory(BaseTestCase):
                     "shared_memory objects to clean up at shutdown", err)
 
     @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
+    @resource_tracker_format_subtests
     def test_shared_memory_untracking(self):
         # gh-82300: When a separate Python process accesses shared memory
         # with track=False, it must not cause the memory to be deleted
@@ -4695,6 +4710,7 @@ class _TestSharedMemory(BaseTestCase):
             mem.close()
 
     @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
+    @resource_tracker_format_subtests
     def test_shared_memory_tracking(self):
         # gh-82300: When a separate Python process accesses shared memory
         # with track=True, it must cause the memory to be deleted when
@@ -6794,13 +6810,18 @@ class SemLockTests(unittest.TestCase):
 
 @unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
 class TestSharedMemoryNames(unittest.TestCase):
-    def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self):
+    @subTests('use_simple_format', (True, False))
+    def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(
+            self, use_simple_format):
         # Test script that creates and cleans up shared memory with colon in name
         test_script = textwrap.dedent("""
             import sys
             from multiprocessing import shared_memory
+            from multiprocessing import resource_tracker
             import time
 
+            resource_tracker._resource_tracker._use_simple_format = %s
+
             # Test various patterns of colons in names
             test_names = [
                 "a:b",
@@ -6828,7 +6849,7 @@ class TestSharedMemoryNames(unittest.TestCase):
                     sys.exit(1)
 
             print("SUCCESS")
-        """)
+        """ % use_simple_format)
 
         rc, out, err = script_helper.assert_python_ok("-c", test_script)
         self.assertIn(b"SUCCESS", out)
diff --git a/Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst b/Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst
new file mode 100644 (file)
index 0000000..2fc2e30
--- /dev/null
@@ -0,0 +1,7 @@
+The resource tracker in the :mod:`multiprocessing` module now uses the
+original communication protocol, as in Python 3.14.0 and below,
+by default.
+This avoids issues with upgrading Python while it is running.
+(Note that such 'in-place' upgrades are not tested.)
+The tracker remains compatible with subprocesses that use new protocol
+(that is, subprocesses using Python 3.13.10, 3.14.1 and 3.15).