From: Giampaolo Rodola' Date: Mon, 31 Dec 2012 16:38:17 +0000 (+0100) Subject: Fix issue 10527: make multiprocessing use poll() instead of select() if available. X-Git-Tag: v3.3.1rc1~451 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=5051ca887c82506a9dcf8b71e0e6890f9e0e1589;p=thirdparty%2FPython%2Fcpython.git Fix issue 10527: make multiprocessing use poll() instead of select() if available. --- 5051ca887c82506a9dcf8b71e0e6890f9e0e1589 diff --cc Lib/multiprocessing/connection.py index fbbd5d91d390,f083c54d2a64..2ae8a811f16b --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@@ -509,7 -213,30 +509,28 @@@ if sys.platform != 'win32' return c1, c2 else: + if hasattr(select, 'poll'): + def _poll(fds, timeout): + if timeout is not None: + timeout = int(timeout) * 1000 # timeout is in milliseconds + fd_map = {} + pollster = select.poll() + for fd in fds: + pollster.register(fd, select.POLLIN) + if hasattr(fd, 'fileno'): + fd_map[fd.fileno()] = fd + else: + fd_map[fd] = fd + ls = [] + for fd, event in pollster.poll(timeout): + if event & select.POLLNVAL: + raise ValueError('invalid file descriptor %i' % fd) + ls.append(fd_map[fd]) + return ls + else: + def _poll(fds, timeout): + return select.select(fds, [], [], timeout)[0] - from _multiprocessing import win32 - def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe diff --cc Lib/test/test_multiprocessing.py index b2a964cec651,fa4865b184f0..533dbacc8f77 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@@ -3107,180 -2381,6 +3108,181 @@@ class TestStdinBadfiledescriptor(unitte assert sio.getvalue() == 'foo' +class TestWait(unittest.TestCase): + + @classmethod + def _child_test_wait(cls, w, slow): + for i in range(10): + if slow: + time.sleep(random.random()*0.1) + w.send((i, os.getpid())) + w.close() + + def test_wait(self, slow=False): + from multiprocessing.connection import wait + readers = [] + procs = [] + messages = [] + + for i in range(4): + r, w = multiprocessing.Pipe(duplex=False) + p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) + p.daemon = True + p.start() + w.close() + readers.append(r) + procs.append(p) + self.addCleanup(p.join) + + while readers: + for r in wait(readers): + try: + msg = r.recv() + except EOFError: + readers.remove(r) + r.close() + else: + messages.append(msg) + + messages.sort() + expected = sorted((i, p.pid) for i in range(10) for p in procs) + self.assertEqual(messages, expected) + + @classmethod + def _child_test_wait_socket(cls, address, slow): + s = socket.socket() + s.connect(address) + for i in range(10): + if slow: + time.sleep(random.random()*0.1) + s.sendall(('%s\n' % i).encode('ascii')) + s.close() + + def test_wait_socket(self, slow=False): + from multiprocessing.connection import wait + l = socket.socket() + l.bind(('', 0)) + l.listen(4) + addr = ('localhost', l.getsockname()[1]) + readers = [] + procs = [] + dic = {} + + for i in range(4): + p = multiprocessing.Process(target=self._child_test_wait_socket, + args=(addr, slow)) + p.daemon = True + p.start() + procs.append(p) + self.addCleanup(p.join) + + for i in range(4): + r, _ = l.accept() + readers.append(r) + dic[r] = [] + l.close() + + while readers: + for r in wait(readers): + msg = r.recv(32) + if not msg: + readers.remove(r) + r.close() + else: + dic[r].append(msg) + + expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') + for v in dic.values(): + self.assertEqual(b''.join(v), expected) + + def test_wait_slow(self): + self.test_wait(True) + + def test_wait_socket_slow(self): + self.test_wait_socket(True) + + def test_wait_timeout(self): + from multiprocessing.connection import wait + + expected = 5 + a, b = multiprocessing.Pipe() + + start = time.time() + res = wait([a, b], expected) + delta = time.time() - start + + self.assertEqual(res, []) + self.assertLess(delta, expected * 2) + self.assertGreater(delta, expected * 0.5) + + b.send(None) + + start = time.time() + res = wait([a, b], 20) + delta = time.time() - start + + self.assertEqual(res, [a]) + self.assertLess(delta, 0.4) + + @classmethod + def signal_and_sleep(cls, sem, period): + sem.release() + time.sleep(period) + + def test_wait_integer(self): + from multiprocessing.connection import wait + + expected = 3 ++ sorted_ = lambda l: sorted(l, key=lambda x: isinstance(x, int)) + sem = multiprocessing.Semaphore(0) + a, b = multiprocessing.Pipe() + p = multiprocessing.Process(target=self.signal_and_sleep, + args=(sem, expected)) + + p.start() + self.assertIsInstance(p.sentinel, int) + self.assertTrue(sem.acquire(timeout=20)) + + start = time.time() + res = wait([a, p.sentinel, b], expected + 20) + delta = time.time() - start + + self.assertEqual(res, [p.sentinel]) + self.assertLess(delta, expected + 2) + self.assertGreater(delta, expected - 2) + + a.send(None) + + start = time.time() + res = wait([a, p.sentinel, b], 20) + delta = time.time() - start + - self.assertEqual(res, [p.sentinel, b]) ++ self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) + self.assertLess(delta, 0.4) + + b.send(None) + + start = time.time() + res = wait([a, p.sentinel, b], 20) + delta = time.time() - start + - self.assertEqual(res, [a, p.sentinel, b]) ++ self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) + self.assertLess(delta, 0.4) + + p.terminate() + p.join() + + def test_neg_timeout(self): + from multiprocessing.connection import wait + a, b = multiprocessing.Pipe() + t = time.time() + res = wait([a], timeout=-1) + t = time.time() - t + self.assertEqual(res, []) + self.assertLess(t, 1) + a.close() + b.close() + # # Issue 14151: Test invalid family on invalid environment # diff --cc Misc/NEWS index c2bfa00148e8,9e0495169401..c2c1c336e75b --- a/Misc/NEWS +++ b/Misc/NEWS @@@ -124,9 -189,8 +124,11 @@@ Core and Builtin Library ------- + - Issue 10527: make multiprocessing use poll() instead of select() if available. + +- Issue #16688: Fix backreferences did make case-insensitive regex fail on + non-ASCII strings. Patch by Matthew Barnett. + - Issue #16485: Fix file descriptor not being closed if file header patching fails on closing of aifc file.