]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
Fix issue 10527: make multiprocessing use poll() instead of select() if available.
authorGiampaolo Rodola' <g.rodola@gmail.com>
Mon, 31 Dec 2012 16:38:17 +0000 (17:38 +0100)
committerGiampaolo Rodola' <g.rodola@gmail.com>
Mon, 31 Dec 2012 16:38:17 +0000 (17:38 +0100)
1  2 
Lib/multiprocessing/connection.py
Lib/test/test_multiprocessing.py
Misc/NEWS

index fbbd5d91d39047a2a43811f5038a8267c29a066a,f083c54d2a64818cb18adf0e1ff7e19937662ebc..2ae8a811f16b6b084e51276fbafa7572e2c834d3
@@@ -509,7 -213,30 +509,28 @@@ if sys.platform != 'win32'
          return c1, c2
  
  else:
+     if hasattr(select, 'poll'):
+         def _poll(fds, timeout):
+             if timeout is not None:
+                 timeout = int(timeout) * 1000  # timeout is in milliseconds
+             fd_map = {}
+             pollster = select.poll()
+             for fd in fds:
+                 pollster.register(fd, select.POLLIN)
+                 if hasattr(fd, 'fileno'):
+                     fd_map[fd.fileno()] = fd
+                 else:
+                     fd_map[fd] = fd
+             ls = []
+             for fd, event in pollster.poll(timeout):
+                 if event & select.POLLNVAL:
+                     raise ValueError('invalid file descriptor %i' % fd)
+                 ls.append(fd_map[fd])
+             return ls
+     else:
+         def _poll(fds, timeout):
+             return select.select(fds, [], [], timeout)[0]
  
 -    from _multiprocessing import win32
 -
      def Pipe(duplex=True):
          '''
          Returns pair of connection objects at either end of a pipe
index b2a964cec651554321808c6b3c49eb3ac8b23e9c,fa4865b184f01d281fce5521778dbc53b68fb973..533dbacc8f77cd3c2c7783c2910789d684b24fef
@@@ -3107,180 -2381,6 +3108,181 @@@ class TestStdinBadfiledescriptor(unitte
          assert sio.getvalue() == 'foo'
  
  
-         self.assertEqual(res, [p.sentinel, b])
 +class TestWait(unittest.TestCase):
 +
 +    @classmethod
 +    def _child_test_wait(cls, w, slow):
 +        for i in range(10):
 +            if slow:
 +                time.sleep(random.random()*0.1)
 +            w.send((i, os.getpid()))
 +        w.close()
 +
 +    def test_wait(self, slow=False):
 +        from multiprocessing.connection import wait
 +        readers = []
 +        procs = []
 +        messages = []
 +
 +        for i in range(4):
 +            r, w = multiprocessing.Pipe(duplex=False)
 +            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
 +            p.daemon = True
 +            p.start()
 +            w.close()
 +            readers.append(r)
 +            procs.append(p)
 +            self.addCleanup(p.join)
 +
 +        while readers:
 +            for r in wait(readers):
 +                try:
 +                    msg = r.recv()
 +                except EOFError:
 +                    readers.remove(r)
 +                    r.close()
 +                else:
 +                    messages.append(msg)
 +
 +        messages.sort()
 +        expected = sorted((i, p.pid) for i in range(10) for p in procs)
 +        self.assertEqual(messages, expected)
 +
 +    @classmethod
 +    def _child_test_wait_socket(cls, address, slow):
 +        s = socket.socket()
 +        s.connect(address)
 +        for i in range(10):
 +            if slow:
 +                time.sleep(random.random()*0.1)
 +            s.sendall(('%s\n' % i).encode('ascii'))
 +        s.close()
 +
 +    def test_wait_socket(self, slow=False):
 +        from multiprocessing.connection import wait
 +        l = socket.socket()
 +        l.bind(('', 0))
 +        l.listen(4)
 +        addr = ('localhost', l.getsockname()[1])
 +        readers = []
 +        procs = []
 +        dic = {}
 +
 +        for i in range(4):
 +            p = multiprocessing.Process(target=self._child_test_wait_socket,
 +                                        args=(addr, slow))
 +            p.daemon = True
 +            p.start()
 +            procs.append(p)
 +            self.addCleanup(p.join)
 +
 +        for i in range(4):
 +            r, _ = l.accept()
 +            readers.append(r)
 +            dic[r] = []
 +        l.close()
 +
 +        while readers:
 +            for r in wait(readers):
 +                msg = r.recv(32)
 +                if not msg:
 +                    readers.remove(r)
 +                    r.close()
 +                else:
 +                    dic[r].append(msg)
 +
 +        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
 +        for v in dic.values():
 +            self.assertEqual(b''.join(v), expected)
 +
 +    def test_wait_slow(self):
 +        self.test_wait(True)
 +
 +    def test_wait_socket_slow(self):
 +        self.test_wait_socket(True)
 +
 +    def test_wait_timeout(self):
 +        from multiprocessing.connection import wait
 +
 +        expected = 5
 +        a, b = multiprocessing.Pipe()
 +
 +        start = time.time()
 +        res = wait([a, b], expected)
 +        delta = time.time() - start
 +
 +        self.assertEqual(res, [])
 +        self.assertLess(delta, expected * 2)
 +        self.assertGreater(delta, expected * 0.5)
 +
 +        b.send(None)
 +
 +        start = time.time()
 +        res = wait([a, b], 20)
 +        delta = time.time() - start
 +
 +        self.assertEqual(res, [a])
 +        self.assertLess(delta, 0.4)
 +
 +    @classmethod
 +    def signal_and_sleep(cls, sem, period):
 +        sem.release()
 +        time.sleep(period)
 +
 +    def test_wait_integer(self):
 +        from multiprocessing.connection import wait
 +
 +        expected = 3
++        sorted_ = lambda l: sorted(l, key=lambda x: isinstance(x, int))
 +        sem = multiprocessing.Semaphore(0)
 +        a, b = multiprocessing.Pipe()
 +        p = multiprocessing.Process(target=self.signal_and_sleep,
 +                                    args=(sem, expected))
 +
 +        p.start()
 +        self.assertIsInstance(p.sentinel, int)
 +        self.assertTrue(sem.acquire(timeout=20))
 +
 +        start = time.time()
 +        res = wait([a, p.sentinel, b], expected + 20)
 +        delta = time.time() - start
 +
 +        self.assertEqual(res, [p.sentinel])
 +        self.assertLess(delta, expected + 2)
 +        self.assertGreater(delta, expected - 2)
 +
 +        a.send(None)
 +
 +        start = time.time()
 +        res = wait([a, p.sentinel, b], 20)
 +        delta = time.time() - start
 +
-         self.assertEqual(res, [a, p.sentinel, b])
++        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
 +        self.assertLess(delta, 0.4)
 +
 +        b.send(None)
 +
 +        start = time.time()
 +        res = wait([a, p.sentinel, b], 20)
 +        delta = time.time() - start
 +
++        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
 +        self.assertLess(delta, 0.4)
 +
 +        p.terminate()
 +        p.join()
 +
 +    def test_neg_timeout(self):
 +        from multiprocessing.connection import wait
 +        a, b = multiprocessing.Pipe()
 +        t = time.time()
 +        res = wait([a], timeout=-1)
 +        t = time.time() - t
 +        self.assertEqual(res, [])
 +        self.assertLess(t, 1)
 +        a.close()
 +        b.close()
 +
  #
  # Issue 14151: Test invalid family on invalid environment
  #
diff --cc Misc/NEWS
index c2bfa00148e8c7a2097fffac9b1d06458bc0f5ff,9e04951694017af86b4073a3f6780e73cfef30b4..c2c1c336e75b411fad8192139f2647772d361a74
+++ b/Misc/NEWS
@@@ -124,9 -189,8 +124,11 @@@ Core and Builtin
  Library
  -------
  
+ - Issue 10527: make multiprocessing use poll() instead of select() if available.
 +- Issue #16688: Fix backreferences did make case-insensitive regex fail on
 +  non-ASCII strings. Patch by Matthew Barnett.
 +
  - Issue #16485: Fix file descriptor not being closed if file header patching
    fails on closing of aifc file.