from test import support
from test.support import os_helper
from test.support.script_helper import assert_python_ok
+from test.support import threading_helper
# http://bugs.python.org/issue4373
# Don't load the xx module more than once.
self.assertIn(lib, cmd.rpath)
self.assertIn(incl, cmd.include_dirs)
+ @threading_helper.requires_working_threading()
def test_optional_extension(self):
# this extension will fail, but let's ignore this failure
from test.libregrtest.utils import removepy, count, format_duration, printlist
from test import support
from test.support import os_helper
+from test.support import threading_helper
# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
except SystemExit as exc:
# bpo-38203: Python can hang at exit in Py_Finalize(), especially
# on threading._shutdown() call: put a timeout
- faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
+ if threading_helper.can_start_thread:
+ faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
sys.exit(exc.code)
self.check_unpickling_error(self.truncated_errors, p)
@threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
def test_unpickle_module_race(self):
# https://bugs.python.org/issue34572
locker_module = dedent("""
import sys
import threading
import time
+import unittest
from test import support
def _can_start_thread() -> bool:
- """Detect if Python can start new threads.
+ """Detect whether Python can start new threads.
Some WebAssembly platforms do not provide a working pthread
implementation. Thread support is stubbed and any attempt
return True
can_start_thread = _can_start_thread()
+
+def requires_working_threading(*, module=False):
+ """Skip tests or modules that require working threading.
+
+ Can be used as a function/class decorator or to skip an entire module.
+ """
+ msg = "requires threading support"
+ if module:
+ if not can_start_thread:
+ raise unittest.SkipTest(msg)
+ else:
+ return unittest.skipUnless(can_start_thread, msg)
else:
self.fail("1/0 didn't raise an exception")
+ @threading_helper.requires_working_threading()
def testThreading(self):
# Issue #7205: Using a BZ2File from several threads shouldn't deadlock.
data = b"1" * 2**20
if False and support.verbose:
print("(%i)"%(len(l),))
+ @threading_helper.requires_working_threading()
def test_pendingcalls_threaded(self):
#do every callback on a separate thread
class TestThreadState(unittest.TestCase):
@threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
def test_thread_state(self):
# some extra thread-state tests driven via _testcapi
def target():
import time
import unittest
import weakref
+from test.support import threading_helper
try:
from _testcapi import hamt
ctx1.run(ctx1_fun)
@isolated_context
+ @threading_helper.requires_working_threading()
def test_context_threads_1(self):
cvar = contextvars.ContextVar('cvar')
run_with_locale, cpython_only,
darwin_malloc_err_warning)
from test.support.import_helper import import_fresh_module
+from test.support import threading_helper
from test.support import warnings_helper
import random
import inspect
for sig in Overflow, Underflow, DivisionByZero, InvalidOperation:
cls.assertFalse(thiscontext.flags[sig])
+
+@threading_helper.requires_working_threading()
class ThreadingTest(unittest.TestCase):
'''Unit tests for thread local contexts in Decimal.'''
addrs = utils.getaddresses([Header('Al Person <aperson@dom.ain>')])
self.assertEqual(addrs[0][1], 'aperson@dom.ain')
+ @threading_helper.requires_working_threading()
def test_make_msgid_collisions(self):
# Test make_msgid uniqueness, even with multiple threads
class MsgidsThread(Thread):
self.assertEqual(str(Color.BLUE), 'blue')
@threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
def test_unique_composite(self):
# override __eq__ to be identity only
class TestFlag(Flag):
self.assertEqual(str(Color.BLUE), 'blue')
@threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
def test_unique_composite(self):
# override __eq__ to be identity only
class TestFlag(IntFlag):
for attr in self.module.WRAPPER_ASSIGNMENTS:
self.assertEqual(getattr(g, attr), getattr(f, attr))
+ @threading_helper.requires_working_threading()
def test_lru_cache_threaded(self):
n, m = 5, 11
def orig(x, y):
finally:
sys.setswitchinterval(orig_si)
+ @threading_helper.requires_working_threading()
def test_lru_cache_threaded2(self):
# Simultaneous call with the same arguments
n, m = 5, 7
pause.reset()
self.assertEqual(f.cache_info(), (0, (i+1)*n, m*n, i+1))
+ @threading_helper.requires_working_threading()
def test_lru_cache_threaded3(self):
@self.module.lru_cache(maxsize=2)
def f(x):
self.assertEqual(item.get_cost(), 4)
self.assertEqual(item.cached_cost, 3)
+ @threading_helper.requires_working_threading()
def test_threaded(self):
go = threading.Event()
item = CachedCostItemWait(go)
v = {1: v, 2: Ouch()}
gc.disable()
+ @threading_helper.requires_working_threading()
def test_trashcan_threads(self):
# Issue #13992: trashcan mechanism should be thread-safe
NESTING = 60
)
@threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
def test_threaded_hashing(self):
# Updating the same hash object from several threads at once
# using data chunk sizes containing the same byte sequences.
with self.assertRaises(AttributeError):
os.does_not_exist
+ @threading_helper.requires_working_threading()
def test_concurrency(self):
# bpo 38091: this is a hack to slow down the code that calls
# has_deadlock(); the logic was itself sometimes deadlocking.
from test import lock_tests
+threading_helper.requires_working_threading(module=True)
+
+
class ModuleLockAsRLockTests:
locktype = classmethod(lambda cls: cls.LockType("some_lock"))
if __name__ == '__main__':
- unittets.main()
+ unittest.main()
from test.support.os_helper import (TESTFN, unlink, rmtree)
from test.support import script_helper, threading_helper
+threading_helper.requires_working_threading(module=True)
+
def task(N, done, done_tasks, errors):
try:
# We don't use modulefinder but still import it in order to stress
self.assertEqual(b"abcdefg", bufio.read())
@support.requires_resource('cpu')
+ @threading_helper.requires_working_threading()
def test_threads(self):
try:
# Write out many bytes with exactly the same number of 0's,
self.assertEqual(f.tell(), buffer_size + 2)
@support.requires_resource('cpu')
+ @threading_helper.requires_working_threading()
def test_threads(self):
try:
# Write out many bytes from many threads and test they were
self.assertRaises(OSError, b.close) # exception not swallowed
self.assertTrue(b.closed)
+ @threading_helper.requires_working_threading()
def test_slow_close_from_thread(self):
# Issue #31976
rawio = self.SlowFlushRawIO()
self.assertEqual(f.errors, "replace")
@support.no_tracing
+ @threading_helper.requires_working_threading()
def test_threads_write(self):
# Issue6750: concurrent writes could duplicate data
event = threading.Event()
else:
self.assertFalse(err.strip('.!'))
+ @threading_helper.requires_working_threading()
def test_daemon_threads_shutdown_stdout_deadlock(self):
self.check_daemon_threads_shutdown_deadlock('stdout')
+ @threading_helper.requires_working_threading()
def test_daemon_threads_shutdown_stderr_deadlock(self):
self.check_daemon_threads_shutdown_deadlock('stderr')
import doctest
import unittest
from test import support
+from test.support import threading_helper
from itertools import *
import weakref
from decimal import Decimal
with self.assertRaisesRegex(RuntimeError, "tee"):
next(a)
+ @threading_helper.requires_working_threading()
def test_tee_concurrent(self):
start = threading.Event()
finish = threading.Event()
@unittest.skipIf(
support.is_emscripten, "Emscripten cannot fstat unlinked files."
)
+ @threading_helper.requires_working_threading()
def test_race(self):
# Issue #14632 refers.
def remove_loop(fname, tries):
# This helps ensure that when fork exists (the important concept) that the
# register_at_fork mechanism is also present and used.
@support.requires_fork()
+ @threading_helper.requires_working_threading()
def test_post_fork_child_no_deadlock(self):
"""Ensure child logging locks are not held; bpo-6721 & bpo-36533."""
class _OurHandler(logging.Handler):
# - end of server_helper section
@support.requires_working_socket()
+@threading_helper.requires_working_threading()
class SMTPHandlerTest(BaseTest):
# bpo-14314, bpo-19665, bpo-34092: don't wait forever
TIMEOUT = support.LONG_TIMEOUT
# assert that no new lines have been added
self.assert_log_lines(lines) # no change
+ @threading_helper.requires_working_threading()
def test_race_between_set_target_and_flush(self):
class MockRaceConditionHandler:
def __init__(self, mem_hdlr):
@support.requires_working_socket()
+@threading_helper.requires_working_threading()
class SocketHandlerTest(BaseTest):
"""Test for SocketHandler objects."""
os_helper.unlink(self.address)
@support.requires_working_socket()
+@threading_helper.requires_working_threading()
class DatagramHandlerTest(BaseTest):
"""Test for DatagramHandler."""
os_helper.unlink(self.address)
@support.requires_working_socket()
+@threading_helper.requires_working_threading()
class SysLogHandlerTest(BaseTest):
"""Test for SysLogHandler using UDP."""
super(IPv6SysLogHandlerTest, self).tearDown()
@support.requires_working_socket()
+@threading_helper.requires_working_threading()
class HTTPHandlerTest(BaseTest):
"""Test for HTTPHandler."""
])
+@threading_helper.requires_working_threading()
class QueueHandlerTest(BaseTest):
# Do not bother with a logger name group.
expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
import multiprocessing
from unittest.mock import patch
+ @threading_helper.requires_working_threading()
class QueueListenerTest(BaseTest):
"""
Tests based on patch submitted for issue #27930. Ensure that
self.fail("trigger thread ended but event never set")
+@threading_helper.requires_working_threading()
class BaseQueueTestMixin(BlockingTestMixin):
def setUp(self):
self.cum = 0
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception): pass
+
+@threading_helper.requires_working_threading()
class FailingQueueTest(BlockingTestMixin):
def setUp(self):
return
results.append(val)
+ @threading_helper.requires_working_threading()
def run_threads(self, n_threads, q, inputs, feed_func, consume_func):
results = []
sentinel = None
scheduler.run()
self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
+ @threading_helper.requires_working_threading()
def test_enter_concurrent(self):
q = queue.Queue()
fun = q.put
scheduler.run()
self.assertEqual(l, [0.02, 0.03, 0.04])
+ @threading_helper.requires_working_threading()
def test_cancel_concurrent(self):
q = queue.Queue()
fun = q.put
from test import support
from test.support import os_helper
from test.support.script_helper import assert_python_ok, spawn_python
+from test.support import threading_helper
try:
import _testcapi
except ImportError:
@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
'need signal.pthread_kill()')
+ @threading_helper.requires_working_threading()
def test_pthread_kill(self):
code = """if 1:
import signal
'need signal.sigwait()')
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
+ @threading_helper.requires_working_threading()
def test_sigwait_thread(self):
# Check that calling sigwait() from a thread doesn't suspend the whole
# process. A new interpreter is spawned to avoid problems when mixing
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
+ @threading_helper.requires_working_threading()
def test_pthread_sigmask(self):
code = """if 1:
import signal
@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
'need signal.pthread_kill()')
+ @threading_helper.requires_working_threading()
def test_pthread_kill_main_thread(self):
# Test that a signal can be sent to the main thread with pthread_kill()
# before any other thread has been created (see issue #12392).
@unittest.skipUnless(hasattr(signal, "SIGUSR1"),
"test needs SIGUSR1")
+ @threading_helper.requires_working_threading()
def test_stress_modifying_handlers(self):
# bpo-43406: race condition between trip_signal() and signal.signal
signum = signal.SIGUSR1
os_helper.unlink(mod_filename)
os_helper.rmtree('__pycache__')
+ @support.requires_working_socket()
def test_HOST(self):
s = socket.create_server((socket_helper.HOST, 0))
s.close()
+ @support.requires_working_socket()
def test_find_unused_port(self):
port = socket_helper.find_unused_port()
s = socket.create_server((socket_helper.HOST, port))
s.close()
+ @support.requires_working_socket()
def test_bind_port(self):
s = socket.socket()
socket_helper.bind_port(s)
# sys._current_frames() is a CPython-only gimmick.
@threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
def test_current_frames(self):
import threading
import traceback
t.join()
@threading_helper.reap_threads
+ @threading_helper.requires_working_threading()
def test_current_exceptions(self):
import threading
import traceback
for moduleName in 'builtins', '__main__', 'some_module':
with self.subTest(moduleName=moduleName):
A.B.X.__module__ = moduleName
- with test.support.captured_stderr() as stderr, \
- test.support.swap_attr(sys, 'unraisablehook',
- sys.__unraisablehook__):
+ with test.support.captured_stderr() as stderr, test.support.swap_attr(
+ sys, 'unraisablehook', sys.__unraisablehook__
+ ):
expected = self.write_unraisable_exc(
- A.B.X(), "msg", "obj");
+ A.B.X(), "msg", "obj"
+ )
report = stderr.getvalue()
self.assertIn(A.B.X.__qualname__, report)
if moduleName in ['builtins', '__main__']:
from test import lock_tests
+threading_helper.requires_working_threading(module=True)
+
NUMTASKS = 10
NUMTRIPS = 3
POLL_SLEEP = 0.010 # seconds = 10 ms
import threading
from traceback import print_exc
+threading_helper.requires_working_threading(module=True)
NUM_THREADS = 20
FILES_PER_THREAD = 50
from test import lock_tests
from test import support
+threading_helper.requires_working_threading(module=True)
# Between fork() and exec(), only async-safe functions are allowed (issues
# #12316 and #11870), and fork() from a worker thread is known to trigger
import _threading_local
+threading_helper.requires_working_threading(module=True)
+
+
class Weak(object):
pass
os.kill(process_pid, signal.SIGUSR2)
signalled_all.release()
+@threading_helper.requires_working_threading()
class ThreadSignals(unittest.TestCase):
def test_signals(self):
from test import support
from test.support import script_helper, ALWAYS_EQ
from test.support import gc_collect
+from test.support import threading_helper
# Used in ReferencesTestCase.test_ref_created_during_del() .
ref_from_del = None
dict = weakref.WeakKeyDictionary()
self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
+ @threading_helper.requires_working_threading()
def test_threaded_weak_valued_setdefault(self):
d = weakref.WeakValueDictionary()
with collect_in_thread():
self.assertIsNot(x, None) # we never put None in there!
del x
+ @threading_helper.requires_working_threading()
def test_threaded_weak_valued_pop(self):
d = weakref.WeakValueDictionary()
with collect_in_thread():
x = d.pop(10, 10)
self.assertIsNot(x, None) # we never put None in there!
+ @threading_helper.requires_working_threading()
def test_threaded_weak_valued_consistency(self):
# Issue #28427: old keys should not remove new values from
# WeakValueDictionary when collecting from another thread.
if exc:
raise exc[0]
+ @threading_helper.requires_working_threading()
def test_threaded_weak_key_dict_copy(self):
# Issue #35615: Weakref keys or values getting GC'ed during dict
# copying should not result in a crash.
self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False)
+ @threading_helper.requires_working_threading()
def test_threaded_weak_key_dict_deepcopy(self):
# Issue #35615: Weakref keys or values getting GC'ed during dict
# copying should not result in a crash.
self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True)
+ @threading_helper.requires_working_threading()
def test_threaded_weak_value_dict_copy(self):
# Issue #35615: Weakref keys or values getting GC'ed during dict
# copying should not result in a crash.
self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False)
+ @threading_helper.requires_working_threading()
def test_threaded_weak_value_dict_deepcopy(self):
# Issue #35615: Weakref keys or values getting GC'ed during dict
# copying should not result in a crash.
--- /dev/null
+Threading tests are now skipped on WASM targets without pthread support.
else
- case $ac_sys_system in #(
- Emscripten) :
- enable_wasm_dynamic_linking=no ;; #(
- WASI) :
- enable_wasm_dynamic_linking=no ;; #(
- *) :
- enable_wasm_dynamic_linking=missing
- ;;
-esac
+ enable_wasm_dynamic_linking=missing
fi
[AC_MSG_ERROR([--enable-wasm-dynamic-linking only applies to Emscripten and WASI])]
)
], [
- AS_CASE([$ac_sys_system],
- [Emscripten], [enable_wasm_dynamic_linking=no],
- [WASI], [enable_wasm_dynamic_linking=no],
- [enable_wasm_dynamic_linking=missing]
- )
+ enable_wasm_dynamic_linking=missing
])
AC_MSG_RESULT([$enable_wasm_dynamic_linking])