]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
bpo-29293: multiprocessing.Condition.notify() lacks parameter `n` (#2480)
authorAntoine Pitrou <pitrou@free.fr>
Tue, 4 Jul 2017 06:59:22 +0000 (08:59 +0200)
committerGitHub <noreply@github.com>
Tue, 4 Jul 2017 06:59:22 +0000 (08:59 +0200)
* bpo-29293: multiprocessing.Condition.notify() lacks parameter `n`

* Add NEWS blurb

Lib/multiprocessing/managers.py
Lib/multiprocessing/synchronize.py
Lib/test/_test_multiprocessing.py
Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst [new file with mode: 0644]

index 43dd02a5a391c12c8d239355ff009b79f92691f6..cae1c10734ba5a58cf69ad47a85263eaeeddaea9 100644 (file)
@@ -999,8 +999,8 @@ class ConditionProxy(AcquirerProxy):
     _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
     def wait(self, timeout=None):
         return self._callmethod('wait', (timeout,))
-    def notify(self):
-        return self._callmethod('notify')
+    def notify(self, n=1):
+        return self._callmethod('notify', (n,))
     def notify_all(self):
         return self._callmethod('notify_all')
     def wait_for(self, predicate, timeout=None):
index d4bdf0e8b1737f6342b0ff11e38d53faf87f0dbb..0590ed68f53ab20be13ad3328b510ba207884aa3 100644 (file)
@@ -268,24 +268,7 @@ class Condition(object):
             for i in range(count):
                 self._lock.acquire()
 
-    def notify(self):
-        assert self._lock._semlock._is_mine(), 'lock is not owned'
-        assert not self._wait_semaphore.acquire(False)
-
-        # to take account of timeouts since last notify() we subtract
-        # woken_count from sleeping_count and rezero woken_count
-        while self._woken_count.acquire(False):
-            res = self._sleeping_count.acquire(False)
-            assert res
-
-        if self._sleeping_count.acquire(False): # try grabbing a sleeper
-            self._wait_semaphore.release()      # wake up one sleeper
-            self._woken_count.acquire()         # wait for the sleeper to wake
-
-            # rezero _wait_semaphore in case a timeout just happened
-            self._wait_semaphore.acquire(False)
-
-    def notify_all(self):
+    def notify(self, n=1):
         assert self._lock._semlock._is_mine(), 'lock is not owned'
         assert not self._wait_semaphore.acquire(False)
 
@@ -296,7 +279,7 @@ class Condition(object):
             assert res
 
         sleepers = 0
-        while self._sleeping_count.acquire(False):
+        while sleepers < n and self._sleeping_count.acquire(False):
             self._wait_semaphore.release()        # wake up one sleeper
             sleepers += 1
 
@@ -308,6 +291,9 @@ class Condition(object):
             while self._wait_semaphore.acquire(False):
                 pass
 
+    def notify_all(self):
+        self.notify(n=sys.maxsize)
+
     def wait_for(self, predicate, timeout=None):
         result = predicate()
         if result:
index d0a5446cfea2c67f8bb5fc77a56b1cffa863df82..d83b5a7b8dc4d3fe9d1f7237bd8f1a2023527244 100644 (file)
@@ -948,6 +948,17 @@ class _TestCondition(BaseTestCase):
         woken.release()
         cond.release()
 
+    def assertReachesEventually(self, func, value):
+        for i in range(10):
+            try:
+                if func() == value:
+                    break
+            except NotImplementedError:
+                break
+            time.sleep(DELTA)
+        time.sleep(DELTA)
+        self.assertReturnsIfImplemented(value, func)
+
     def check_invariant(self, cond):
         # this is only supposed to succeed when there are no sleepers
         if self.TYPE == 'processes':
@@ -1055,13 +1066,54 @@ class _TestCondition(BaseTestCase):
         cond.release()
 
         # check they have all woken
-        for i in range(10):
-            try:
-                if get_value(woken) == 6:
-                    break
-            except NotImplementedError:
-                break
-            time.sleep(DELTA)
+        self.assertReachesEventually(lambda: get_value(woken), 6)
+
+        # check state is not mucked up
+        self.check_invariant(cond)
+
+    def test_notify_n(self):
+        cond = self.Condition()
+        sleeping = self.Semaphore(0)
+        woken = self.Semaphore(0)
+
+        # start some threads/processes
+        for i in range(3):
+            p = self.Process(target=self.f, args=(cond, sleeping, woken))
+            p.daemon = True
+            p.start()
+
+            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
+            t.daemon = True
+            t.start()
+
+        # wait for them to all sleep
+        for i in range(6):
+            sleeping.acquire()
+
+        # check no process/thread has woken up
+        time.sleep(DELTA)
+        self.assertReturnsIfImplemented(0, get_value, woken)
+
+        # wake some of them up
+        cond.acquire()
+        cond.notify(n=2)
+        cond.release()
+
+        # check 2 have woken
+        self.assertReachesEventually(lambda: get_value(woken), 2)
+
+        # wake the rest of them
+        cond.acquire()
+        cond.notify(n=4)
+        cond.release()
+
+        self.assertReachesEventually(lambda: get_value(woken), 6)
+
+        # doesn't do anything more
+        cond.acquire()
+        cond.notify(n=3)
+        cond.release()
+
         self.assertReturnsIfImplemented(6, get_value, woken)
 
         # check state is not mucked up
diff --git a/Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst b/Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst
new file mode 100644 (file)
index 0000000..9ef3ace
--- /dev/null
@@ -0,0 +1,5 @@
+Add missing parameter "n" on multiprocessing.Condition.notify().
+
+The doc claims multiprocessing.Condition behaves like threading.Condition,
+but its notify() method lacked the optional "n" argument (to specify the
+number of sleepers to wake up) that threading.Condition.notify() accepts.