Returns list of those objects in object_list which are ready/readable.
'''
+ object_list = list(object_list)
+
+ if not object_list:
+ if timeout is None:
+ while True:
+ time.sleep(1e6)
+ elif timeout > 0:
+ time.sleep(timeout)
+ return []
+
if timeout is None:
timeout = INFINITE
elif timeout < 0:
timeout = 0
else:
timeout = int(timeout * 1000 + 0.5)
-
- object_list = list(object_list)
waithandle_to_obj = {}
ov_list = []
ready_objects = set()
self.assertRaises(OSError, a.recv)
self.assertRaises(OSError, b.recv)
+ @warnings_helper.ignore_fork_in_thread_deprecation_warnings()
+ def test_wait_empty(self):
+ if self.TYPE != 'processes':
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+ # gh-145587: wait() with empty list should respect timeout
+ timeout = 0.5
+ start = time.monotonic()
+ res = self.connection.wait([], timeout=timeout)
+ duration = time.monotonic() - start
+
+ self.assertEqual(res, [])
+ self.assertGreaterEqual(duration, timeout - 0.1)
+
class _TestListener(BaseTestCase):
ALLOWED_TYPES = ('processes',)