]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
Stop making fork server have copy of semaphore_tracker_fd.
authorRichard Oudkerk <shibturn@gmail.com>
Thu, 22 Aug 2013 10:38:57 +0000 (11:38 +0100)
committerRichard Oudkerk <shibturn@gmail.com>
Thu, 22 Aug 2013 10:38:57 +0000 (11:38 +0100)
Lib/multiprocessing/forkserver.py
Lib/multiprocessing/popen_spawn_posix.py
Lib/multiprocessing/popen_spawn_win32.py
Lib/multiprocessing/semaphore_tracker.py
Lib/multiprocessing/spawn.py

index c0ac9935205fdeae8b0972cd0700a811e98c8a7a..208bd4e31dcf4efd039f814fb3d976abb321abbd 100644 (file)
@@ -10,6 +10,7 @@ import threading
 from . import connection
 from . import process
 from . import reduction
+from . import semaphore_tracker
 from . import spawn
 from . import util
 
@@ -55,13 +56,14 @@ def connect_to_new_process(fds):
     The calling process should write to data_w the pickled preparation and
     process data.
     '''
-    if len(fds) + 3 >= MAXFDS_TO_SEND:
+    if len(fds) + 4 >= MAXFDS_TO_SEND:
         raise ValueError('too many fds')
     with socket.socket(socket.AF_UNIX) as client:
         client.connect(_forkserver_address)
         parent_r, child_w = util.pipe()
         child_r, parent_w = util.pipe()
-        allfds = [child_r, child_w, _forkserver_alive_fd]
+        allfds = [child_r, child_w, _forkserver_alive_fd,
+                  semaphore_tracker._semaphore_tracker_fd]
         allfds += fds
         try:
             reduction.sendfds(client, allfds)
@@ -88,8 +90,6 @@ def ensure_running():
             return
 
         assert all(type(mod) is str for mod in _preload_modules)
-        config = process.current_process()._config
-        semaphore_tracker_fd = config['semaphore_tracker_fd']
         cmd = ('from multiprocessing.forkserver import main; ' +
                'main(%d, %d, %r, **%r)')
 
@@ -110,7 +110,7 @@ def ensure_running():
             # when they all terminate the read end becomes ready.
             alive_r, alive_w = util.pipe()
             try:
-                fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
+                fds_to_pass = [listener.fileno(), alive_r]
                 cmd %= (listener.fileno(), alive_r, _preload_modules, data)
                 exe = spawn.get_executable()
                 args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
@@ -197,7 +197,8 @@ def _serve_one(s, listener, alive_r, handler):
     fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
     s.close()
     assert len(fds) <= MAXFDS_TO_SEND
-    child_r, child_w, _forkserver_alive_fd, *_inherited_fds = fds
+    child_r, child_w, _forkserver_alive_fd, stfd, *_inherited_fds = fds
+    semaphore_tracker._semaphore_tracker_fd = stfd
 
     # send pid to client processes
     write_unsigned(child_w, os.getpid())
index de262aaa02fcbf1f29a5c14b08798c9d6f938f2a..e67915d9c9207a65fb7dc66a3b1cd0d0cda32b13 100644 (file)
@@ -40,7 +40,8 @@ class Popen(popen_fork.Popen):
         return fd
 
     def _launch(self, process_obj):
-        tracker_fd = current_process()._config['semaphore_tracker_fd']
+        from . import semaphore_tracker
+        tracker_fd = semaphore_tracker._semaphore_tracker_fd
         self._fds.append(tracker_fd)
         prep_data = spawn.get_preparation_data(process_obj._name)
         fp = io.BytesIO()
@@ -55,7 +56,8 @@ class Popen(popen_fork.Popen):
         try:
             parent_r, child_w = util.pipe()
             child_r, parent_w = util.pipe()
-            cmd = spawn.get_command_line() + [str(child_r)]
+            cmd = spawn.get_command_line(tracker_fd=tracker_fd,
+                                         pipe_handle=child_r)
             self._fds.extend([child_r, child_w])
             self.pid = util.spawnv_passfds(spawn.get_executable(),
                                            cmd, self._fds)
index 7e0c4b3b501c751d8f30dc84acfb2d537a7f6361..f1e9aae332e441f802ba1e17533f5a0f29aedf2b 100644 (file)
@@ -32,13 +32,14 @@ class Popen(object):
 
     def __init__(self, process_obj):
         prep_data = spawn.get_preparation_data(process_obj._name)
-        cmd = ' '.join('"%s"' % x for x in spawn.get_command_line())
 
         # read end of pipe will be "stolen" by the child process
         # -- see spawn_main() in spawn.py.
         rhandle, whandle = _winapi.CreatePipe(None, 0)
         wfd = msvcrt.open_osfhandle(whandle, 0)
-        cmd += ' {} {}'.format(os.getpid(), rhandle)
+        cmd = spawn.get_command_line(parent_pid=os.getpid(),
+                                     pipe_handle=rhandle)
+        cmd = ' '.join('"%s"' % x for x in cmd)
 
         with open(wfd, 'wb', closefd=True) as to_child:
             # start process
index 4a2d63658d3637a78158aad8a7491eda0433b4ae..99a0dd4f8fb80b894617f0b56341ae37231345f1 100644 (file)
@@ -26,6 +26,7 @@ from . import current_process
 __all__ = ['ensure_running', 'register', 'unregister']
 
 
+_semaphore_tracker_fd = None
 _lock = threading.Lock()
 
 
@@ -34,9 +35,9 @@ def ensure_running():
 
     This can be run from any process.  Usually a child process will use
     the semaphore created by its parent.'''
+    global _semaphore_tracker_fd
     with _lock:
-        config = current_process()._config
-        if config.get('semaphore_tracker_fd') is not None:
+        if _semaphore_tracker_fd is not None:
             return
         fds_to_pass = []
         try:
@@ -44,7 +45,7 @@ def ensure_running():
         except Exception:
             pass
         cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
-        r, semaphore_tracker_fd = util.pipe()
+        r, w = util.pipe()
         try:
             fds_to_pass.append(r)
             # process will out live us, so no need to wait on pid
@@ -53,10 +54,10 @@ def ensure_running():
             args += ['-c', cmd % r]
             util.spawnv_passfds(exe, args, fds_to_pass)
         except:
-            os.close(semaphore_tracker_fd)
+            os.close(w)
             raise
         else:
-            config['semaphore_tracker_fd'] = semaphore_tracker_fd
+            _semaphore_tracker_fd = w
         finally:
             os.close(r)
 
@@ -77,8 +78,7 @@ def _send(cmd, name):
         # posix guarantees that writes to a pipe of less than PIPE_BUF
         # bytes are atomic, and that PIPE_BUF >= 512
         raise ValueError('name too long')
-    fd = current_process()._config['semaphore_tracker_fd']
-    nbytes = os.write(fd, msg)
+    nbytes = os.write(_semaphore_tracker_fd, msg)
     assert nbytes == len(msg)
 
 
index 83561dbd3faae695c375061da0ab521088d48c60..9c4acee5f69edaf093f595c3f379d330b60bf462 100644 (file)
@@ -66,32 +66,33 @@ def freeze_support():
         sys.exit()
 
 
-def get_command_line():
+def get_command_line(**kwds):
     '''
     Returns prefix of command line used for spawning a child process
     '''
     if getattr(sys, 'frozen', False):
         return [sys.executable, '--multiprocessing-fork']
     else:
-        prog = 'from multiprocessing.spawn import spawn_main; spawn_main()'
+        prog = 'from multiprocessing.spawn import spawn_main; spawn_main(%s)'
+        prog %= ', '.join('%s=%r' % item for item in kwds.items())
         opts = util._args_from_interpreter_flags()
         return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
 
 
-def spawn_main():
+def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
     '''
     Run code specifed by data received over pipe
     '''
     assert is_forking(sys.argv)
-    handle = int(sys.argv[-1])
     if sys.platform == 'win32':
         import msvcrt
         from .reduction import steal_handle
-        pid = int(sys.argv[-2])
-        new_handle = steal_handle(pid, handle)
+        new_handle = steal_handle(parent_pid, pipe_handle)
         fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
     else:
-        fd = handle
+        from . import semaphore_tracker
+        semaphore_tracker._semaphore_tracker_fd = tracker_fd
+        fd = pipe_handle
     exitcode = _main(fd)
     sys.exit(exitcode)