]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-121313: Limit the reading size from pipes to their default buffer size on POSIX...
authorAlexander P. <37912520+aplaikner@users.noreply.github.com>
Sat, 31 Aug 2024 05:57:22 +0000 (07:57 +0200)
committerGitHub <noreply@github.com>
Sat, 31 Aug 2024 05:57:22 +0000 (22:57 -0700)
See https://github.com/python/cpython/issues/121313 for analysis, but this greatly reduces memory overallocation and overhead when multiprocessing is sending non-small data over its pipes between processes.

Lib/multiprocessing/connection.py
Misc/NEWS.d/next/C API/2024-07-03-10-11-53.gh-issue-121313.D7gARW.rst [new file with mode: 0644]

index b7e1e132172d0203d9f584854a7bc7b30aa08bd0..d84b52fe6d4f888cd9d25a12cd6d698d4e63f7ec 100644 (file)
@@ -11,13 +11,14 @@ __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
 
 import errno
 import io
+import itertools
 import os
+import stat
 import sys
 import socket
 import struct
-import time
 import tempfile
-import itertools
+import time
 
 
 from . import util
@@ -360,6 +361,11 @@ if _winapi:
             f.write(ov.getbuffer())
             return f
 
+"""
+The default size of a pipe on Linux systems is 16 times the base page size:
+https://man7.org/linux/man-pages/man7/pipe.7.html
+"""
+PAGES_PER_PIPE = 16
 
 class Connection(_ConnectionBase):
     """
@@ -372,11 +378,14 @@ class Connection(_ConnectionBase):
             _close(self._handle)
         _write = _multiprocessing.send
         _read = _multiprocessing.recv
+        _default_pipe_size = 0
     else:
         def _close(self, _close=os.close):
             _close(self._handle)
         _write = os.write
         _read = os.read
+        _base_page_size = os.sysconf(os.sysconf_names['SC_PAGESIZE'])
+        _default_pipe_size = _base_page_size * PAGES_PER_PIPE
 
     def _send(self, buf, write=_write):
         remaining = len(buf)
@@ -391,8 +400,14 @@ class Connection(_ConnectionBase):
         buf = io.BytesIO()
         handle = self._handle
         remaining = size
+        is_pipe = False
+        if size > self._default_pipe_size > 0:
+            mode = os.fstat(handle).st_mode
+            is_pipe = stat.S_ISFIFO(mode)
+        limit = self._default_pipe_size if is_pipe else remaining
         while remaining > 0:
-            chunk = read(handle, remaining)
+            to_read = min(limit, remaining)
+            chunk = read(handle, to_read)
             n = len(chunk)
             if n == 0:
                 if remaining == size:
diff --git a/Misc/NEWS.d/next/C API/2024-07-03-10-11-53.gh-issue-121313.D7gARW.rst b/Misc/NEWS.d/next/C API/2024-07-03-10-11-53.gh-issue-121313.D7gARW.rst
new file mode 100644 (file)
index 0000000..06abce9
--- /dev/null
@@ -0,0 +1 @@
+Limit reading size in multiprocessing connection._recv for pipes to default pipe size of 16 times base page size, in order to avoid memory overallocation and unnecessary memory management system calls.