close_queue(q)
+ @support.requires_resource('walltime')
def test_many_processes(self):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
def test_wait_socket_slow(self):
self.test_wait_socket(True)
+ @support.requires_resource('walltime')
def test_wait_timeout(self):
from multiprocessing.connection import wait
sem.release()
time.sleep(period)
+ @support.requires_resource('walltime')
def test_wait_integer(self):
from multiprocessing.connection import wait
cpu - Used for certain CPU-heavy tests.
+ walltime - Long running but not CPU-bound tests.
+
subprocess Run all tests for the subprocess module.
urlfetch - It is okay to download files required on testing.
ALL_RESOURCES = ('audio', 'curses', 'largefile', 'network',
- 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui')
+ 'decimal', 'cpu', 'subprocess', 'urlfetch', 'gui', 'walltime')
# Other resources excluded from --use=all:
#
--- /dev/null
+import threading
+import time
+import weakref
+from concurrent import futures
+from test import support
+
+
+def mul(x, y):
+ return x * y
+
+def capture(*args, **kwargs):
+ return args, kwargs
+
+
+class MyObject(object):
+ def my_method(self):
+ pass
+
+
+def make_dummy_object(_):
+ return MyObject()
+
+
+class ExecutorTest:
+ # Executor.shutdown() and context manager usage is tested by
+ # ExecutorShutdownTest.
+ def test_submit(self):
+ future = self.executor.submit(pow, 2, 8)
+ self.assertEqual(256, future.result())
+
+ def test_submit_keyword(self):
+ future = self.executor.submit(mul, 2, y=8)
+ self.assertEqual(16, future.result())
+ future = self.executor.submit(capture, 1, self=2, fn=3)
+ self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
+ with self.assertRaises(TypeError):
+ self.executor.submit(fn=capture, arg=1)
+ with self.assertRaises(TypeError):
+ self.executor.submit(arg=1)
+
+ def test_map(self):
+ self.assertEqual(
+ list(self.executor.map(pow, range(10), range(10))),
+ list(map(pow, range(10), range(10))))
+
+ self.assertEqual(
+ list(self.executor.map(pow, range(10), range(10), chunksize=3)),
+ list(map(pow, range(10), range(10))))
+
+ def test_map_exception(self):
+ i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
+ self.assertEqual(i.__next__(), (0, 1))
+ self.assertEqual(i.__next__(), (0, 1))
+ self.assertRaises(ZeroDivisionError, i.__next__)
+
+ @support.requires_resource('walltime')
+ def test_map_timeout(self):
+ results = []
+ try:
+ for i in self.executor.map(time.sleep,
+ [0, 0, 6],
+ timeout=5):
+ results.append(i)
+ except futures.TimeoutError:
+ pass
+ else:
+ self.fail('expected TimeoutError')
+
+ self.assertEqual([None, None], results)
+
+ def test_shutdown_race_issue12456(self):
+ # Issue #12456: race condition at shutdown where trying to post a
+ # sentinel in the call queue blocks (the queue is full while processes
+ # have exited).
+ self.executor.map(str, [2] * (self.worker_count + 1))
+ self.executor.shutdown()
+
+ @support.cpython_only
+ def test_no_stale_references(self):
+ # Issue #16284: check that the executors don't unnecessarily hang onto
+ # references.
+ my_object = MyObject()
+ my_object_collected = threading.Event()
+ my_object_callback = weakref.ref(
+ my_object, lambda obj: my_object_collected.set())
+ # Deliberately discarding the future.
+ self.executor.submit(my_object.my_method)
+ del my_object
+
+ collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
+ self.assertTrue(collected,
+ "Stale reference not collected within timeout.")
+
+ def test_max_workers_negative(self):
+ for number in (0, -1):
+ with self.assertRaisesRegex(ValueError,
+ "max_workers must be greater "
+ "than 0"):
+ self.executor_type(max_workers=number)
+
+ def test_free_reference(self):
+ # Issue #14406: Result iterator should not keep an internal
+ # reference to result objects.
+ for obj in self.executor.map(make_dummy_object, range(10)):
+ wr = weakref.ref(obj)
+ del obj
+ support.gc_collect() # For PyPy or other GCs.
+ self.assertIsNone(wr())
--- /dev/null
+import sys
+import threading
+import time
+import unittest
+from concurrent import futures
+from test import support
+
+from .util import (
+ CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ create_executor_tests, setup_module,
+ BaseTestCase, ThreadPoolMixin,
+ ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
+
+
+def mul(x, y):
+ return x * y
+
+def sleep_and_raise(t):
+ time.sleep(t)
+ raise Exception('this is an exception')
+
+
+class WaitTests:
+ def test_20369(self):
+ # See https://bugs.python.org/issue20369
+ future = self.executor.submit(time.sleep, 1.5)
+ done, not_done = futures.wait([future, future],
+ return_when=futures.ALL_COMPLETED)
+ self.assertEqual({future}, done)
+ self.assertEqual(set(), not_done)
+
+
+ def test_first_completed(self):
+ future1 = self.executor.submit(mul, 21, 2)
+ future2 = self.executor.submit(time.sleep, 1.5)
+
+ done, not_done = futures.wait(
+ [CANCELLED_FUTURE, future1, future2],
+ return_when=futures.FIRST_COMPLETED)
+
+ self.assertEqual(set([future1]), done)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
+
+ def test_first_completed_some_already_completed(self):
+ future1 = self.executor.submit(time.sleep, 1.5)
+
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
+ return_when=futures.FIRST_COMPLETED)
+
+ self.assertEqual(
+ set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
+ finished)
+ self.assertEqual(set([future1]), pending)
+
+ @support.requires_resource('walltime')
+ def test_first_exception(self):
+ future1 = self.executor.submit(mul, 2, 21)
+ future2 = self.executor.submit(sleep_and_raise, 1.5)
+ future3 = self.executor.submit(time.sleep, 3)
+
+ finished, pending = futures.wait(
+ [future1, future2, future3],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEqual(set([future1, future2]), finished)
+ self.assertEqual(set([future3]), pending)
+
+ def test_first_exception_some_already_complete(self):
+ future1 = self.executor.submit(divmod, 21, 0)
+ future2 = self.executor.submit(time.sleep, 1.5)
+
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1, future2],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
+
+ def test_first_exception_one_already_failed(self):
+ future1 = self.executor.submit(time.sleep, 2)
+
+ finished, pending = futures.wait(
+ [EXCEPTION_FUTURE, future1],
+ return_when=futures.FIRST_EXCEPTION)
+
+ self.assertEqual(set([EXCEPTION_FUTURE]), finished)
+ self.assertEqual(set([future1]), pending)
+
+ def test_all_completed(self):
+ future1 = self.executor.submit(divmod, 2, 0)
+ future2 = self.executor.submit(mul, 2, 21)
+
+ finished, pending = futures.wait(
+ [SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2],
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([SUCCESSFUL_FUTURE,
+ CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ future1,
+ future2]), finished)
+ self.assertEqual(set(), pending)
+
+ @support.requires_resource('walltime')
+ def test_timeout(self):
+ future1 = self.executor.submit(mul, 6, 7)
+ future2 = self.executor.submit(time.sleep, 6)
+
+ finished, pending = futures.wait(
+ [CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1, future2],
+ timeout=5,
+ return_when=futures.ALL_COMPLETED)
+
+ self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+ EXCEPTION_FUTURE,
+ SUCCESSFUL_FUTURE,
+ future1]), finished)
+ self.assertEqual(set([future2]), pending)
+
+
+class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
+
+ def test_pending_calls_race(self):
+ # Issue #14406: multi-threaded race condition when waiting on all
+ # futures.
+ event = threading.Event()
+ def future_func():
+ event.wait()
+ oldswitchinterval = sys.getswitchinterval()
+ sys.setswitchinterval(1e-6)
+ try:
+ fs = {self.executor.submit(future_func) for i in range(100)}
+ event.set()
+ futures.wait(fs, return_when=futures.ALL_COMPLETED)
+ finally:
+ sys.setswitchinterval(oldswitchinterval)
+
+
+create_executor_tests(globals(), WaitTests,
+ executor_mixins=(ProcessPoolForkMixin,
+ ProcessPoolForkserverMixin,
+ ProcessPoolSpawnMixin))
+
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()
class EINTRTests(unittest.TestCase):
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
+ @support.requires_resource('walltime')
def test_all(self):
# Run the tester in a sub-process, to make sure there is only one
# thread (for reliable signal delivery).
with tempfile.TemporaryFile('wb+') as fp:
self.check_dump_traceback_later(fd=fp.fileno())
+ @support.requires_resource('walltime')
def test_dump_traceback_later_twice(self):
self.check_dump_traceback_later(loops=2)
h.close()
self.assertIn('nginx', server_string)
+ @support.requires_resource('walltime')
def test_networked_bad_cert(self):
# We feed a "CA" cert that is unrelated to the server's cert
import ssl
import socket
from test.support import (verbose,
- run_with_tz, run_with_locale, cpython_only,
+ run_with_tz, run_with_locale, cpython_only, requires_resource,
requires_working_socket)
from test.support import hashlib_helper
from test.support import threading_helper
with self.imap_class(*server.server_address):
pass
+ @requires_resource('walltime')
def test_imaplib_timeout_test(self):
_, server = self._setup(SimpleIMAPHandler)
addr = server.server_address[1]
imap_class = IMAP4_SSL
server_class = SecureTCPServer
+ @requires_resource('walltime')
def test_ssl_raises(self):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED)
ssl_context=ssl_context)
client.shutdown()
+ @requires_resource('walltime')
def test_ssl_verified(self):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(CAFILE)
self.assertFalse(err.strip('.!'))
@threading_helper.requires_working_threading()
+ @support.requires_resource('walltime')
def test_daemon_threads_shutdown_stdout_deadlock(self):
self.check_daemon_threads_shutdown_deadlock('stdout')
@threading_helper.requires_working_threading()
+ @support.requires_resource('walltime')
def test_daemon_threads_shutdown_stderr_deadlock(self):
self.check_daemon_threads_shutdown_deadlock('stderr')
os.close(r)
@requires_alarm
+ @support.requires_resource('walltime')
def test_interrupted_read_retry_buffered(self):
self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
mode="rb")
@requires_alarm
+ @support.requires_resource('walltime')
def test_interrupted_read_retry_text(self):
self.check_interrupted_read_retry(lambda x: x,
mode="r", encoding="latin1")
raise
@requires_alarm
+ @support.requires_resource('walltime')
def test_interrupted_write_retry_buffered(self):
self.check_interrupted_write_retry(b"x", mode="wb")
@requires_alarm
+ @support.requires_resource('walltime')
def test_interrupted_write_retry_text(self):
self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
support.is_emscripten, "Emscripten cannot fstat unlinked files."
)
@threading_helper.requires_working_threading()
+ @support.requires_resource('walltime')
def test_race(self):
# Issue #14632 refers.
def remove_loop(fname, tries):
import time
import unittest
from test.support import (
- cpython_only, requires_subprocess, requires_working_socket
+ cpython_only, requires_subprocess, requires_working_socket, requires_resource
)
from test.support import threading_helper
from test.support.os_helper import TESTFN
# select(), modified to use poll() instead.
@requires_subprocess()
+ @requires_resource('walltime')
def test_poll2(self):
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
interrupted = self.readpipe_interrupted(True)
self.assertTrue(interrupted)
+ @support.requires_resource('walltime')
def test_siginterrupt_off(self):
# If a signal handler is installed and siginterrupt is called with
# a false value for the second argument, when that signal arrives, it
server.ehlo()
server.quit()
+ @support.requires_resource('walltime')
def test_connect_using_sslcontext(self):
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'Needs IPv6')
+ @support.requires_resource('walltime')
def test_get_server_certificate_ipv6(self):
with socket_helper.transient_internet('ipv6.google.com'):
_test_get_server_certificate(self, 'ipv6.google.com', 443)
class ThreadedTests(unittest.TestCase):
+ @support.requires_resource('walltime')
def test_echo(self):
"""Basic test of an SSL client connecting to a server"""
if support.verbose:
self.assertIn('stdin', c.exception.args[0])
self.assertIn('input', c.exception.args[0])
+ @support.requires_resource('walltime')
def test_check_output_timeout(self):
# check_output() function with timeout arg
with self.assertRaises(subprocess.TimeoutExpired) as c:
self.assertIn('stdin', c.exception.args[0])
self.assertIn('input', c.exception.args[0])
+ @support.requires_resource('walltime')
def test_check_output_timeout(self):
with self.assertRaises(subprocess.TimeoutExpired) as c:
cp = self.run_python((
# XXX The rest of these tests aren't very good -- they don't check much.
# They do sometimes catch some major disasters, though.
+ @support.requires_resource('walltime')
def test_ftp(self):
# Testing the same URL twice exercises the caching in CacheFTPHandler
urls = [
self.assertEqual(res.geturl(),
"http://www.pythontest.net/index.html#frag")
+ @support.requires_resource('walltime')
def test_redirect_url_withfrag(self):
redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
with socket_helper.transient_internet(redirect_url_with_frag):
FTP_HOST = 'ftp://www.pythontest.net/'
+ @support.requires_resource('walltime')
def test_ftp_basic(self):
self.assertIsNone(socket.getdefaulttimeout())
with socket_helper.transient_internet(self.FTP_HOST, timeout=None):
socket.setdefaulttimeout(None)
self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
+ @support.requires_resource('walltime')
def test_ftp_no_timeout(self):
self.assertIsNone(socket.getdefaulttimeout())
with socket_helper.transient_internet(self.FTP_HOST):
socket.setdefaulttimeout(None)
self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
+ @support.requires_resource('walltime')
def test_ftp_timeout(self):
with socket_helper.transient_internet(self.FTP_HOST):
u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
open_url.close()
self.assertEqual(code, 404)
+ @support.requires_resource('walltime')
def test_bad_address(self):
# Make sure proper exception is raised when connecting to a bogus
# address.
logo = "http://www.pythontest.net/"
+ @support.requires_resource('walltime')
def test_data_header(self):
with self.urlretrieve(self.logo) as (file_location, fileheaders):
datevalue = fileheaders.get('Date')
self.assertEqual(p.add(6,8), 6+8)
self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
+ @support.requires_resource('walltime')
def test_path3(self):
p = xmlrpclib.ServerProxy(URL+"/is/broken")
self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
+ @support.requires_resource('walltime')
def test_invalid_path(self):
p = xmlrpclib.ServerProxy(URL+"/invalid")
self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
+ @support.requires_resource('walltime')
def test_path_query_fragment(self):
p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag")
self.assertEqual(p.test(), "/foo?k=v#frag")
+ @support.requires_resource('walltime')
def test_path_fragment(self):
p = xmlrpclib.ServerProxy(URL+"/foo#frag")
self.assertEqual(p.test(), "/foo#frag")
+ @support.requires_resource('walltime')
def test_path_query(self):
p = xmlrpclib.ServerProxy(URL+"/foo?k=v")
self.assertEqual(p.test(), "/foo?k=v")
+ @support.requires_resource('walltime')
def test_empty_path(self):
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.test(), "/RPC2")
+ @support.requires_resource('walltime')
def test_root_path(self):
p = xmlrpclib.ServerProxy(URL + "/")
self.assertEqual(p.test(), "/")
+ @support.requires_resource('walltime')
def test_empty_path_query(self):
p = xmlrpclib.ServerProxy(URL + "?k=v")
self.assertEqual(p.test(), "?k=v")
+ @support.requires_resource('walltime')
def test_empty_path_fragment(self):
p = xmlrpclib.ServerProxy(URL + "#frag")
self.assertEqual(p.test(), "#frag")