]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
bpo-40221: Update multiprocessing to use _at_fork_reinit (GH-19511)
authorDong-hee Na <donghee.na92@gmail.com>
Tue, 14 Apr 2020 16:35:36 +0000 (01:35 +0900)
committerGitHub <noreply@github.com>
Tue, 14 Apr 2020 16:35:36 +0000 (01:35 +0900)
Lib/multiprocessing/queues.py
Lib/multiprocessing/resource_sharer.py

index 835070118387ea70a929835856c6dd0bb865bd35..c0a284d10c8070df9878e08fdcc1bcc0ff899de1 100644 (file)
@@ -49,8 +49,7 @@ class Queue(object):
         self._sem = ctx.BoundedSemaphore(maxsize)
         # For use by concurrent.futures
         self._ignore_epipe = False
-
-        self._after_fork()
+        self._reset()
 
         if sys.platform != 'win32':
             register_after_fork(self, Queue._after_fork)
@@ -63,11 +62,17 @@ class Queue(object):
     def __setstate__(self, state):
         (self._ignore_epipe, self._maxsize, self._reader, self._writer,
          self._rlock, self._wlock, self._sem, self._opid) = state
-        self._after_fork()
+        self._reset()
 
     def _after_fork(self):
         debug('Queue._after_fork()')
-        self._notempty = threading.Condition(threading.Lock())
+        self._reset(after_fork=True)
+
+    def _reset(self, after_fork=False):
+        if after_fork:
+            self._notempty._at_fork_reinit()
+        else:
+            self._notempty = threading.Condition(threading.Lock())
         self._buffer = collections.deque()
         self._thread = None
         self._jointhread = None
index 8d5c9900f69fede88b477e49d312ae631128266e..66076509a1202e7a1b4d8a481f64621a4bfbbf3e 100644 (file)
@@ -63,7 +63,6 @@ class _ResourceSharer(object):
     def __init__(self):
         self._key = 0
         self._cache = {}
-        self._old_locks = []
         self._lock = threading.Lock()
         self._listener = None
         self._address = None
@@ -113,10 +112,7 @@ class _ResourceSharer(object):
         for key, (send, close) in self._cache.items():
             close()
         self._cache.clear()
-        # If self._lock was locked at the time of the fork, it may be broken
-        # -- see issue 6721.  Replace it without letting it be gc'ed.
-        self._old_locks.append(self._lock)
-        self._lock = threading.Lock()
+        self._lock._at_fork_reinit()
         if self._listener is not None:
             self._listener.close()
         self._listener = None