assert sio.getvalue() == 'foo'
- self.assertEqual(res, [p.sentinel, b])
+class TestWait(unittest.TestCase):
+
+ @classmethod
+ def _child_test_wait(cls, w, slow):
+ for i in range(10):
+ if slow:
+ time.sleep(random.random()*0.1)
+ w.send((i, os.getpid()))
+ w.close()
+
+ def test_wait(self, slow=False):
+ from multiprocessing.connection import wait
+ readers = []
+ procs = []
+ messages = []
+
+ for i in range(4):
+ r, w = multiprocessing.Pipe(duplex=False)
+ p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
+ p.daemon = True
+ p.start()
+ w.close()
+ readers.append(r)
+ procs.append(p)
+ self.addCleanup(p.join)
+
+ while readers:
+ for r in wait(readers):
+ try:
+ msg = r.recv()
+ except EOFError:
+ readers.remove(r)
+ r.close()
+ else:
+ messages.append(msg)
+
+ messages.sort()
+ expected = sorted((i, p.pid) for i in range(10) for p in procs)
+ self.assertEqual(messages, expected)
+
+ @classmethod
+ def _child_test_wait_socket(cls, address, slow):
+ s = socket.socket()
+ s.connect(address)
+ for i in range(10):
+ if slow:
+ time.sleep(random.random()*0.1)
+ s.sendall(('%s\n' % i).encode('ascii'))
+ s.close()
+
+ def test_wait_socket(self, slow=False):
+ from multiprocessing.connection import wait
+ l = socket.socket()
+ l.bind(('', 0))
+ l.listen(4)
+ addr = ('localhost', l.getsockname()[1])
+ readers = []
+ procs = []
+ dic = {}
+
+ for i in range(4):
+ p = multiprocessing.Process(target=self._child_test_wait_socket,
+ args=(addr, slow))
+ p.daemon = True
+ p.start()
+ procs.append(p)
+ self.addCleanup(p.join)
+
+ for i in range(4):
+ r, _ = l.accept()
+ readers.append(r)
+ dic[r] = []
+ l.close()
+
+ while readers:
+ for r in wait(readers):
+ msg = r.recv(32)
+ if not msg:
+ readers.remove(r)
+ r.close()
+ else:
+ dic[r].append(msg)
+
+ expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
+ for v in dic.values():
+ self.assertEqual(b''.join(v), expected)
+
+ def test_wait_slow(self):
+ self.test_wait(True)
+
+ def test_wait_socket_slow(self):
+ self.test_wait_socket(True)
+
+ def test_wait_timeout(self):
+ from multiprocessing.connection import wait
+
+ expected = 5
+ a, b = multiprocessing.Pipe()
+
+ start = time.time()
+ res = wait([a, b], expected)
+ delta = time.time() - start
+
+ self.assertEqual(res, [])
+ self.assertLess(delta, expected * 2)
+ self.assertGreater(delta, expected * 0.5)
+
+ b.send(None)
+
+ start = time.time()
+ res = wait([a, b], 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [a])
+ self.assertLess(delta, 0.4)
+
+ @classmethod
+ def signal_and_sleep(cls, sem, period):
+ sem.release()
+ time.sleep(period)
+
+ def test_wait_integer(self):
+ from multiprocessing.connection import wait
+
+ expected = 3
++ sorted_ = lambda l: sorted(l, key=lambda x: isinstance(x, int))
+ sem = multiprocessing.Semaphore(0)
+ a, b = multiprocessing.Pipe()
+ p = multiprocessing.Process(target=self.signal_and_sleep,
+ args=(sem, expected))
+
+ p.start()
+ self.assertIsInstance(p.sentinel, int)
+ self.assertTrue(sem.acquire(timeout=20))
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], expected + 20)
+ delta = time.time() - start
+
+ self.assertEqual(res, [p.sentinel])
+ self.assertLess(delta, expected + 2)
+ self.assertGreater(delta, expected - 2)
+
+ a.send(None)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], 20)
+ delta = time.time() - start
+
- self.assertEqual(res, [a, p.sentinel, b])
++ self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
+ self.assertLess(delta, 0.4)
+
+ b.send(None)
+
+ start = time.time()
+ res = wait([a, p.sentinel, b], 20)
+ delta = time.time() - start
+
++ self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
+ self.assertLess(delta, 0.4)
+
+ p.terminate()
+ p.join()
+
+ def test_neg_timeout(self):
+ from multiprocessing.connection import wait
+ a, b = multiprocessing.Pipe()
+ t = time.time()
+ res = wait([a], timeout=-1)
+ t = time.time() - t
+ self.assertEqual(res, [])
+ self.assertLess(t, 1)
+ a.close()
+ b.close()
+
#
# Issue 14151: Test invalid family on invalid environment
#