import zipfile
from importlib.util import source_from_cache
+from test import support
from test.support.import_helper import make_legacy_pyc
# Cached result of the expensive test performed in the function below.
__cached_interp_requires_environment = None
+
def interpreter_requires_environment():
"""
Returns True if our sys.executable interpreter requires environment
rc = proc.returncode
return _PythonRunResult(rc, out, err), cmd_line
+
def _assert_python(expected_success, /, *args, **env_vars):
res, cmd_line = run_python_until_end(*args, **env_vars)
if (res.rc and expected_success) or (not res.rc and not expected_success):
res.fail(cmd_line)
return res
+
def assert_python_ok(*args, **env_vars):
"""
Assert that running the interpreter with `args` and optional environment
"""
return _assert_python(True, *args, **env_vars)
+
def assert_python_failure(*args, **env_vars):
"""
Assert that running the interpreter with `args` and optional environment
"""
return _assert_python(False, *args, **env_vars)
+
def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
"""Run a Python subprocess with the given arguments.
stdout=stdout, stderr=stderr,
**kw)
+
def kill_python(p):
"""Run the given Popen process until completion and return stdout."""
p.stdin.close()
subprocess._cleanup()
return data
+
def make_script(script_dir, script_basename, source, omit_suffix=False):
script_filename = script_basename
if not omit_suffix:
importlib.invalidate_caches()
return script_name
+
def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
zip_filename = zip_basename+os.extsep+'zip'
zip_name = os.path.join(zip_dir, zip_filename)
# zip_file.printdir()
return zip_name, os.path.join(zip_name, name_in_zip)
+
def make_pkg(pkg_dir, init_source=''):
os.mkdir(pkg_dir)
make_script(pkg_dir, '__init__', init_source)
+
def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
source, depth=1, compiled=False):
unlink = []
# print 'Contents of %r:' % zip_name
# zip_file.printdir()
return zip_name, os.path.join(zip_name, script_name_in_zip)
+
+
+def run_test_script(script):
+ # use -u to try to get the full output if the test hangs or crash
+ if support.verbose:
+ def title(text):
+ return f"===== {text} ======"
+
+ name = f"script {os.path.basename(script)}"
+ print()
+ print(title(name), flush=True)
+ # In verbose mode, the child process inherit stdout and stdout,
+ # to see output in realtime and reduce the risk of losing output.
+ args = [sys.executable, "-E", "-X", "faulthandler", "-u", script, "-v"]
+ proc = subprocess.run(args)
+ print(title(f"{name} completed: exit code {proc.returncode}"),
+ flush=True)
+ if proc.returncode:
+ raise AssertionError(f"{name} failed")
+ else:
+ assert_python_ok("-u", script, "-v")
def test_all(self):
# Run the tester in a sub-process, to make sure there is only one
# thread (for reliable signal delivery).
- tester = support.findfile("eintr_tester.py", subdir="eintrdata")
- # use -u to try to get the full output if the test hangs or crash
- args = ["-u", tester, "-v"]
- if support.verbose:
- print()
- print("--- run eintr_tester.py ---", flush=True)
- # In verbose mode, the child process inherit stdout and stdout,
- # to see output in realtime and reduce the risk of losing output.
- args = [sys.executable, "-E", "-X", "faulthandler", *args]
- proc = subprocess.run(args)
- print(f"--- eintr_tester.py completed: "
- f"exit code {proc.returncode} ---", flush=True)
- if proc.returncode:
- self.fail("eintr_tester.py failed")
- else:
- script_helper.assert_python_ok("-u", tester, "-v")
+ script = support.findfile("_test_eintr.py")
+ script_helper.run_test_script(script)
if __name__ == "__main__":