import dis
-from test.support import import_module
+from test.support.import_helper import import_module
import unittest
_opcode = import_module("_opcode")
import threading
from test import support
+from test.support import os_helper
from test.support import socket_helper
from test.support import threading_helper
+from test.support import warnings_helper
from io import BytesIO
if support.PGO:
"""Helper function to bind a socket according to its family."""
if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
# Make sure the path doesn't exist.
- support.unlink(addr)
+ os_helper.unlink(addr)
socket_helper.bind_unix_socket(sock, addr)
else:
sock.bind(addr)
class FileWrapperTest(unittest.TestCase):
def setUp(self):
self.d = b"It's not dead, it's sleeping!"
- with open(support.TESTFN, 'wb') as file:
+ with open(os_helper.TESTFN, 'wb') as file:
file.write(self.d)
def tearDown(self):
- support.unlink(support.TESTFN)
+ os_helper.unlink(os_helper.TESTFN)
def test_recv(self):
- fd = os.open(support.TESTFN, os.O_RDONLY)
+ fd = os.open(os_helper.TESTFN, os.O_RDONLY)
w = asyncore.file_wrapper(fd)
os.close(fd)
def test_send(self):
d1 = b"Come again?"
d2 = b"I want to buy some cheese."
- fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND)
+ fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_APPEND)
w = asyncore.file_wrapper(fd)
os.close(fd)
w.write(d1)
w.send(d2)
w.close()
- with open(support.TESTFN, 'rb') as file:
+ with open(os_helper.TESTFN, 'rb') as file:
self.assertEqual(file.read(), self.d + d1 + d2)
@unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
'asyncore.file_dispatcher required')
def test_dispatcher(self):
- fd = os.open(support.TESTFN, os.O_RDONLY)
+ fd = os.open(os_helper.TESTFN, os.O_RDONLY)
data = []
class FileDispatcher(asyncore.file_dispatcher):
def handle_read(self):
def test_resource_warning(self):
# Issue #11453
- fd = os.open(support.TESTFN, os.O_RDONLY)
+ fd = os.open(os_helper.TESTFN, os.O_RDONLY)
f = asyncore.file_wrapper(fd)
os.close(fd)
- with support.check_warnings(('', ResourceWarning)):
+ with warnings_helper.check_warnings(('', ResourceWarning)):
f = None
support.gc_collect()
def test_close_twice(self):
- fd = os.open(support.TESTFN, os.O_RDONLY)
+ fd = os.open(os_helper.TESTFN, os.O_RDONLY)
f = asyncore.file_wrapper(fd)
os.close(fd)
class TestAPI_UseUnixSockets(BaseTestAPI):
if HAS_UNIX_SOCKETS:
family = socket.AF_UNIX
- addr = support.TESTFN
+ addr = os_helper.TESTFN
def tearDown(self):
- support.unlink(self.addr)
+ os_helper.unlink(self.addr)
BaseTestAPI.tearDown(self)
class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):
import binascii
import array
import re
-from test import support
+from test.support import warnings_helper
+
# Note: "*_hex" functions are aliases for "(un)hexlify"
b2a_functions = ['b2a_base64', 'b2a_hex', 'b2a_hqx', 'b2a_qp', 'b2a_uu',
self.assertTrue(hasattr(getattr(binascii, name), '__call__'))
self.assertRaises(TypeError, getattr(binascii, name))
- @support.ignore_warnings(category=DeprecationWarning)
+ @warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_returned_value(self):
# Limit to the minimum of all limits (b2a_uu)
MAX_ALL = 45
with self.assertRaises(TypeError):
binascii.b2a_uu(b"", True)
- @support.ignore_warnings(category=DeprecationWarning)
+ @warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_crc_hqx(self):
crc = binascii.crc_hqx(self.type2test(b"Test the CRC-32 of"), 0)
crc = binascii.crc_hqx(self.type2test(b" this string."), crc)
self.assertRaises(TypeError, binascii.crc32)
- @support.ignore_warnings(category=DeprecationWarning)
+ @warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_hqx(self):
# Perform binhex4 style RLE-compression
# Then calculate the hexbin4 binary-to-ASCII translation
res = binascii.rledecode_hqx(b)
self.assertEqual(res, self.rawdata)
- @support.ignore_warnings(category=DeprecationWarning)
+ @warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_rle(self):
# test repetition with a repetition longer than the limit of 255
data = (b'a' * 100 + b'b' + b'c' * 300)
self.assertEqual(b2a_qp(type2test(b'a.\n')), b'a.\n')
self.assertEqual(b2a_qp(type2test(b'.a')[:-1]), b'=2E')
- @support.ignore_warnings(category=DeprecationWarning)
+ @warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_empty_string(self):
# A test for SF bug #1022953. Make sure SystemError is not raised.
empty = self.type2test(b'')
# crc_hqx needs 2 arguments
self.assertRaises(TypeError, binascii.crc_hqx, "test", 0)
- @support.ignore_warnings(category=DeprecationWarning)
+ @warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_unicode_a2b(self):
# Unicode strings are accepted by a2b_* functions.
MAX_ALL = 45
import sys
import unittest
-from test import support
+from test.support import import_helper
from collections import UserList
-py_bisect = support.import_fresh_module('bisect', blocked=['_bisect'])
-c_bisect = support.import_fresh_module('bisect', fresh=['_bisect'])
+
+py_bisect = import_helper.import_fresh_module('bisect', blocked=['_bisect'])
+c_bisect = import_helper.import_fresh_module('bisect', fresh=['_bisect'])
class Range(object):
"""A trivial range()-like object that has an insert() method."""
from types import AsyncGeneratorType, FunctionType
from operator import neg
from test import support
-from test.support import (
- EnvironmentVarGuard, TESTFN, check_warnings, swap_attr, unlink,
- maybe_get_event_loop_policy)
+from test.support import (swap_attr, maybe_get_event_loop_policy)
+from test.support.os_helper import (EnvironmentVarGuard, TESTFN, unlink)
from test.support.script_helper import assert_python_ok
+from test.support.warnings_helper import check_warnings
from unittest.mock import MagicMock, patch
try:
import pty, signal
from test import support
+from test.support import import_helper
from test.support import threading_helper
# Skip tests if _multiprocessing wasn't built.
-support.import_module('_multiprocessing')
+import_helper.import_module('_multiprocessing')
# Skip tests if sem_open implementation is broken.
support.skip_if_broken_multiprocessing_synchronize()
import warnings
from test import support
+from test.support import os_helper
class SortedDict(collections.UserDict):
cf.add_section(s)
for j in range(10):
cf.set(s, 'lovely_spam{}'.format(j), self.wonderful_spam)
- with open(support.TESTFN, 'w') as f:
+ with open(os_helper.TESTFN, 'w') as f:
cf.write(f)
def tearDown(self):
- os.unlink(support.TESTFN)
+ os.unlink(os_helper.TESTFN)
def test_dominating_multiline_values(self):
# We're reading from file because this is where the code changed
# during performance updates in Python 3.2
cf_from_file = self.newconfig()
- with open(support.TESTFN) as f:
+ with open(os_helper.TESTFN) as f:
cf_from_file.read_file(f)
self.assertEqual(cf_from_file.get('section8', 'lovely_spam4'),
self.wonderful_spam.replace('\t\n', '\n'))
import unittest
import warnings
from test import support
+from test.support import import_helper
+from test.support import warnings_helper
from test.support.script_helper import assert_python_ok
def test_asyncio_1(self):
# asyncio cannot be imported when Python is compiled without thread
# support
- asyncio = support.import_module('asyncio')
+ asyncio = import_helper.import_module('asyncio')
class MyException(Exception):
pass
try:
warnings._warn_unawaited_coroutine = lambda coro: 1/0
with support.catch_unraisable_exception() as cm, \
- support.check_warnings((r'coroutine .* was never awaited',
- RuntimeWarning)):
+ warnings_helper.check_warnings(
+ (r'coroutine .* was never awaited',
+ RuntimeWarning)):
# only store repr() to avoid keeping the coroutine alive
coro = corofn()
coro_repr = repr(coro)
self.assertEqual(cm.unraisable.exc_type, ZeroDivisionError)
del warnings._warn_unawaited_coroutine
- with support.check_warnings((r'coroutine .* was never awaited',
- RuntimeWarning)):
+ with warnings_helper.check_warnings(
+ (r'coroutine .* was never awaited', RuntimeWarning)):
corofn()
support.gc_collect()
import tempfile
import unittest
-from test.support import requires, import_module, verbose, SaveSignals
+from test.support import requires, verbose, SaveSignals
+from test.support.import_helper import import_module
# Optionally test curses module. This currently requires that the
# 'curses' resource be given on the regrtest command line using the -u
import unittest
import sys
-from test.support import import_fresh_module, run_unittest
+from test.support import run_unittest
+from test.support.import_helper import import_fresh_module
+
TESTS = 'test.datetimetester'
-from test.support import (TESTFN, import_module, unlink,
- requires, _2G, _4G, gc_collect, cpython_only)
+from test.support import (requires, _2G, _4G, gc_collect, cpython_only)
+from test.support.import_helper import import_module
+from test.support.os_helper import TESTFN, unlink
import unittest
import os
import re
import time
import unittest
from test import support
+from test.support import os_helper
from test.support.script_helper import assert_python_ok, spawn_python
try:
import _testcapi
signal.set_wakeup_fd(signal.SIGINT, False)
def test_invalid_fd(self):
- fd = support.make_bad_fd()
+ fd = os_helper.make_bad_fd()
self.assertRaises((ValueError, OSError),
signal.set_wakeup_fd, fd)
import unittest
import unittest.mock
from test import support
+from test.support import import_helper
+from test.support import os_helper
from test.support import socket_helper
from test.support import threading_helper
+from test.support import warnings_helper
import socket
import select
import time
except ImportError:
ctypes = None
-ssl = support.import_module("ssl")
+ssl = import_helper.import_module("ssl")
from ssl import TLSVersion, _TLSContentType, _TLSMessageType
s = socket.socket(socket.AF_INET)
ss = test_wrap_socket(s)
wr = weakref.ref(ss)
- with support.check_warnings(("", ResourceWarning)):
+ with warnings_helper.check_warnings(("", ResourceWarning)):
del ss
self.assertEqual(wr(), None)
self.assertEqual(len(paths), 6)
self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
- with support.EnvironmentVarGuard() as env:
+ with os_helper.EnvironmentVarGuard() as env:
env["SSL_CERT_DIR"] = CAPATH
env["SSL_CERT_FILE"] = CERTFILE
paths = ssl.get_default_verify_paths()
@unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
def test_load_default_certs_env(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- with support.EnvironmentVarGuard() as env:
+ with os_helper.EnvironmentVarGuard() as env:
env["SSL_CERT_DIR"] = CAPATH
env["SSL_CERT_FILE"] = CERTFILE
ctx.load_default_certs()
stats = ctx.cert_store_stats()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- with support.EnvironmentVarGuard() as env:
+ with os_helper.EnvironmentVarGuard() as env:
env["SSL_CERT_DIR"] = CAPATH
env["SSL_CERT_FILE"] = CERTFILE
ctx.load_default_certs()
def test_sendfile(self):
TEST_DATA = b"x" * 512
- with open(support.TESTFN, 'wb') as f:
+ with open(os_helper.TESTFN, 'wb') as f:
f.write(TEST_DATA)
- self.addCleanup(support.unlink, support.TESTFN)
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(SIGNING_CA)
with server:
with context.wrap_socket(socket.socket()) as s:
s.connect((HOST, server.port))
- with open(support.TESTFN, 'rb') as file:
+ with open(os_helper.TESTFN, 'rb') as file:
s.sendfile(file)
self.assertEqual(s.recv(1024), TEST_DATA)
class TestSSLDebug(unittest.TestCase):
- def keylog_lines(self, fname=support.TESTFN):
+ def keylog_lines(self, fname=os_helper.TESTFN):
with open(fname) as f:
return len(list(f))
@requires_keylog
@unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
def test_keylog_defaults(self):
- self.addCleanup(support.unlink, support.TESTFN)
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self.assertEqual(ctx.keylog_filename, None)
- self.assertFalse(os.path.isfile(support.TESTFN))
- ctx.keylog_filename = support.TESTFN
- self.assertEqual(ctx.keylog_filename, support.TESTFN)
- self.assertTrue(os.path.isfile(support.TESTFN))
+ self.assertFalse(os.path.isfile(os_helper.TESTFN))
+ ctx.keylog_filename = os_helper.TESTFN
+ self.assertEqual(ctx.keylog_filename, os_helper.TESTFN)
+ self.assertTrue(os.path.isfile(os_helper.TESTFN))
self.assertEqual(self.keylog_lines(), 1)
ctx.keylog_filename = None
with self.assertRaises((IsADirectoryError, PermissionError)):
# Windows raises PermissionError
ctx.keylog_filename = os.path.dirname(
- os.path.abspath(support.TESTFN))
+ os.path.abspath(os_helper.TESTFN))
with self.assertRaises(TypeError):
ctx.keylog_filename = 1
@requires_keylog
@unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
def test_keylog_filename(self):
- self.addCleanup(support.unlink, support.TESTFN)
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
client_context, server_context, hostname = testing_context()
- client_context.keylog_filename = support.TESTFN
+ client_context.keylog_filename = os_helper.TESTFN
server = ThreadedEchoServer(context=server_context, chatty=False)
with server:
with client_context.wrap_socket(socket.socket(),
self.assertEqual(self.keylog_lines(), 6)
client_context.keylog_filename = None
- server_context.keylog_filename = support.TESTFN
+ server_context.keylog_filename = os_helper.TESTFN
server = ThreadedEchoServer(context=server_context, chatty=False)
with server:
with client_context.wrap_socket(socket.socket(),
s.connect((HOST, server.port))
self.assertGreaterEqual(self.keylog_lines(), 11)
- client_context.keylog_filename = support.TESTFN
- server_context.keylog_filename = support.TESTFN
+ client_context.keylog_filename = os_helper.TESTFN
+ server_context.keylog_filename = os_helper.TESTFN
server = ThreadedEchoServer(context=server_context, chatty=False)
with server:
with client_context.wrap_socket(socket.socket(),
"test is not compatible with ignore_environment")
@unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
def test_keylog_env(self):
- self.addCleanup(support.unlink, support.TESTFN)
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
with unittest.mock.patch.dict(os.environ):
- os.environ['SSLKEYLOGFILE'] = support.TESTFN
- self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
+ os.environ['SSLKEYLOGFILE'] = os_helper.TESTFN
+ self.assertEqual(os.environ['SSLKEYLOGFILE'], os_helper.TESTFN)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self.assertEqual(ctx.keylog_filename, None)
ctx = ssl.create_default_context()
- self.assertEqual(ctx.keylog_filename, support.TESTFN)
+ self.assertEqual(ctx.keylog_filename, os_helper.TESTFN)
ctx = ssl._create_stdlib_context()
- self.assertEqual(ctx.keylog_filename, support.TESTFN)
+ self.assertEqual(ctx.keylog_filename, os_helper.TESTFN)
def test_msg_callback(self):
client_context, server_context, hostname = testing_context()
import tarfile
from test import support
+from test.support import os_helper
from test.support import script_helper
# Check for our compression modules.
def sha256sum(data):
return sha256(data).hexdigest()
-TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
+TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir"
tarextdir = TEMPDIR + '-extract-test'
tarname = support.findfile("testtar.tar")
gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
@unittest.skipUnless(hasattr(os, "link"),
"Missing hardlink implementation")
- @support.skip_unless_symlink
+ @os_helper.skip_unless_symlink
def test_extract_hardlink(self):
# Test hardlink extraction (e.g. bug #857297).
with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
tar.extract("ustar/regtype", TEMPDIR)
- self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
+ self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
tar.extract("ustar/lnktype", TEMPDIR)
- self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
+ self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
data = f.read()
self.assertEqual(sha256sum(data), sha256_regtype)
tar.extract("ustar/symtype", TEMPDIR)
- self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
+ self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
data = f.read()
self.assertEqual(sha256sum(data), sha256_regtype)
self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
finally:
tar.close()
- support.rmtree(DIR)
+ os_helper.rmtree(DIR)
def test_extract_directory(self):
dirtype = "ustar/dirtype"
if sys.platform != "win32":
self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
finally:
- support.rmtree(DIR)
+ os_helper.rmtree(DIR)
def test_extractall_pathlike_name(self):
DIR = pathlib.Path(TEMPDIR) / "extractall"
- with support.temp_dir(DIR), \
+ with os_helper.temp_dir(DIR), \
tarfile.open(tarname, encoding="iso8859-1") as tar:
directories = [t for t in tar if t.isdir()]
tar.extractall(DIR, directories)
def test_extract_pathlike_name(self):
dirtype = "ustar/dirtype"
DIR = pathlib.Path(TEMPDIR) / "extractall"
- with support.temp_dir(DIR), \
+ with os_helper.temp_dir(DIR), \
tarfile.open(tarname, encoding="iso8859-1") as tar:
tarinfo = tar.getmember(dirtype)
tar.extract(tarinfo, path=DIR)
else:
self.fail("ReadError not raised")
finally:
- support.unlink(empty)
+ os_helper.unlink(empty)
def test_parallel_iteration(self):
# Issue #16601: Restarting iteration over tarfile continued
fobj.write(b'x' * 4096)
fobj.truncate()
s = os.stat(name)
- support.unlink(name)
+ os_helper.unlink(name)
return (s.st_blocks * 512 < s.st_size)
else:
return False
finally:
tar.close()
finally:
- support.rmdir(path)
+ os_helper.rmdir(path)
# mock the following:
# os.listdir: so we know that files are in the wrong order
finally:
tar.close()
finally:
- support.unlink(os.path.join(path, "1"))
- support.unlink(os.path.join(path, "2"))
- support.rmdir(path)
+ os_helper.unlink(os.path.join(path, "1"))
+ os_helper.unlink(os.path.join(path, "2"))
+ os_helper.rmdir(path)
def test_gettarinfo_pathlike_name(self):
with tarfile.open(tmpname, self.mode) as tar:
finally:
tar.close()
finally:
- support.unlink(target)
- support.unlink(link)
+ os_helper.unlink(target)
+ os_helper.unlink(link)
- @support.skip_unless_symlink
+ @os_helper.skip_unless_symlink
def test_symlink_size(self):
path = os.path.join(TEMPDIR, "symlink")
os.symlink("link_target", path)
finally:
tar.close()
finally:
- support.unlink(path)
+ os_helper.unlink(path)
def test_add_self(self):
# Test for #1257255.
self.assertEqual(tar.getnames(), [],
"added the archive to itself")
- with support.change_cwd(TEMPDIR):
+ with os_helper.change_cwd(TEMPDIR):
tar.add(dstname)
self.assertEqual(tar.getnames(), [],
"added the archive to itself")
try:
for name in ("foo", "bar", "baz"):
name = os.path.join(tempdir, name)
- support.create_empty_file(name)
+ os_helper.create_empty_file(name)
def filter(tarinfo):
if os.path.basename(tarinfo.name) == "bar":
finally:
tar.close()
finally:
- support.rmtree(tempdir)
+ os_helper.rmtree(tempdir)
# Guarantee that stored pathnames are not modified. Don't
# remove ./ or ../ or double slashes. Still make absolute
# and compare the stored name with the original.
foo = os.path.join(TEMPDIR, "foo")
if not dir:
- support.create_empty_file(foo)
+ os_helper.create_empty_file(foo)
else:
os.mkdir(foo)
tar.close()
if not dir:
- support.unlink(foo)
+ os_helper.unlink(foo)
else:
- support.rmdir(foo)
+ os_helper.rmdir(foo)
self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
- @support.skip_unless_symlink
+ @os_helper.skip_unless_symlink
def test_extractall_symlinks(self):
# Test if extractall works properly when tarfile contains symlinks
tempdir = os.path.join(TEMPDIR, "testsymlinks")
except OSError:
self.fail("extractall failed with symlinked files")
finally:
- support.unlink(temparchive)
- support.rmtree(tempdir)
+ os_helper.unlink(temparchive)
+ os_helper.rmtree(tempdir)
def test_pathnames(self):
self._test_pathname("foo")
def test_cwd(self):
# Test adding the current working directory.
- with support.change_cwd(TEMPDIR):
+ with os_helper.change_cwd(TEMPDIR):
tar = tarfile.open(tmpname, self.mode)
try:
tar.add(".")
# Test for issue #8464: Create files with correct
# permissions.
if os.path.exists(tmpname):
- support.unlink(tmpname)
+ os_helper.unlink(tmpname)
original_umask = os.umask(0o022)
try:
self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2)
self.assertEqual(buf_reg[device_headers], b"\0" * 16)
finally:
- support.rmtree(tempdir)
+ os_helper.rmtree(tempdir)
class CreateTest(WriteTestBase, unittest.TestCase):
file_path = os.path.join(TEMPDIR, "spameggs42")
def setUp(self):
- support.unlink(tmpname)
+ os_helper.unlink(tmpname)
@classmethod
def setUpClass(cls):
@classmethod
def tearDownClass(cls):
- support.unlink(cls.file_path)
+ os_helper.unlink(cls.file_path)
def test_create(self):
with tarfile.open(tmpname, self.mode) as tobj:
def tearDown(self):
self.tar.close()
- support.unlink(self.foo)
- support.unlink(self.bar)
+ os_helper.unlink(self.foo)
+ os_helper.unlink(self.bar)
def test_add_twice(self):
# The same name will be added as a REGTYPE every
def setUp(self):
self.tarname = tmpname
if os.path.exists(self.tarname):
- support.unlink(self.tarname)
+ os_helper.unlink(self.tarname)
def _create_testtar(self, mode="w:"):
with tarfile.open(tarname, encoding="iso8859-1") as src:
files = [support.findfile('tokenize_tests.txt'),
support.findfile('tokenize_tests-no-coding-cookie-'
'and-utf8-bom-sig-only.txt')]
- self.addCleanup(support.unlink, tar_name)
+ self.addCleanup(os_helper.unlink, tar_name)
with tarfile.open(tar_name, 'w') as tf:
for tardata in files:
tf.add(tardata, arcname=os.path.basename(tardata))
self.assertEqual(out, b'')
self.assertEqual(rc, 1)
finally:
- support.unlink(tmpname)
+ os_helper.unlink(tmpname)
def test_list_command(self):
for tar_name in testtarnames:
with tarfile.open(tmpname) as tar:
tar.getmembers()
finally:
- support.unlink(tmpname)
+ os_helper.unlink(tmpname)
def test_create_command_verbose(self):
files = [support.findfile('tokenize_tests.txt'),
with tarfile.open(tmpname) as tar:
tar.getmembers()
finally:
- support.unlink(tmpname)
+ os_helper.unlink(tmpname)
def test_create_command_dotless_filename(self):
files = [support.findfile('tokenize_tests.txt')]
with tarfile.open(dotlessname) as tar:
tar.getmembers()
finally:
- support.unlink(dotlessname)
+ os_helper.unlink(dotlessname)
def test_create_command_dot_started_filename(self):
tar_name = os.path.join(TEMPDIR, ".testtar")
with tarfile.open(tar_name) as tar:
tar.getmembers()
finally:
- support.unlink(tar_name)
+ os_helper.unlink(tar_name)
def test_create_command_compressed(self):
files = [support.findfile('tokenize_tests.txt'),
with filetype.taropen(tar_name) as tar:
tar.getmembers()
finally:
- support.unlink(tar_name)
+ os_helper.unlink(tar_name)
def test_extract_command(self):
self.make_simple_tarfile(tmpname)
for opt in '-e', '--extract':
try:
- with support.temp_cwd(tarextdir):
+ with os_helper.temp_cwd(tarextdir):
out = self.tarfilecmd(opt, tmpname)
self.assertEqual(out, b'')
finally:
- support.rmtree(tarextdir)
+ os_helper.rmtree(tarextdir)
def test_extract_command_verbose(self):
self.make_simple_tarfile(tmpname)
for opt in '-v', '--verbose':
try:
- with support.temp_cwd(tarextdir):
+ with os_helper.temp_cwd(tarextdir):
out = self.tarfilecmd(opt, '-e', tmpname,
PYTHONIOENCODING='utf-8')
self.assertIn(b' file is extracted.', out)
finally:
- support.rmtree(tarextdir)
+ os_helper.rmtree(tarextdir)
def test_extract_command_different_directory(self):
self.make_simple_tarfile(tmpname)
try:
- with support.temp_cwd(tarextdir):
+ with os_helper.temp_cwd(tarextdir):
out = self.tarfilecmd('-e', tmpname, 'spamdir')
self.assertEqual(out, b'')
finally:
- support.rmtree(tarextdir)
+ os_helper.rmtree(tarextdir)
def test_extract_command_invalid_file(self):
zipname = support.findfile('zipdir.zip')
- with support.temp_cwd(tarextdir):
+ with os_helper.temp_cwd(tarextdir):
rc, out, err = self.tarfilecmd_failure('-e', zipname)
self.assertIn(b' is not a tar archive.', err)
self.assertEqual(out, b'')
def setUpModule():
- support.unlink(TEMPDIR)
+ os_helper.unlink(TEMPDIR)
os.makedirs(TEMPDIR)
global testtarnames
# Create compressed tarfiles.
for c in GzipTest, Bz2Test, LzmaTest:
if c.open:
- support.unlink(c.tarname)
+ os_helper.unlink(c.tarname)
testtarnames.append(c.tarname)
with c.open(c.tarname, "wb") as tar:
tar.write(data)
def tearDownModule():
if os.path.exists(TEMPDIR):
- support.rmtree(TEMPDIR)
+ os_helper.rmtree(TEMPDIR)
if __name__ == "__main__":
unittest.main()
import test.support
from test.support import threading_helper
-from test.support import verbose, import_module, cpython_only
+from test.support import verbose, cpython_only
+from test.support.import_helper import import_module
from test.support.script_helper import assert_python_ok, assert_python_failure
import random
import os
import sys
-from test.support import TESTFN, TESTFN_UNICODE, FS_NONASCII, rmtree, unlink, captured_stdout
+from test.support import captured_stdout
+from test.support.os_helper import (TESTFN, rmtree, unlink)
from test.support.script_helper import assert_python_ok, assert_python_failure
import textwrap
import unittest
from test.support.script_helper import (assert_python_ok, assert_python_failure,
interpreter_requires_environment)
from test import support
+from test.support import os_helper
try:
import _testcapi
self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10)
# write on disk
- snapshot.dump(support.TESTFN)
- self.addCleanup(support.unlink, support.TESTFN)
+ snapshot.dump(os_helper.TESTFN)
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
# load from disk
- snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
+ snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
self.assertEqual(snapshot2.traces, snapshot.traces)
# tracemalloc must be tracing memory allocations to take a snapshot
# take a snapshot with a new attribute
snapshot = tracemalloc.take_snapshot()
snapshot.test_attr = "new"
- snapshot.dump(support.TESTFN)
- self.addCleanup(support.unlink, support.TESTFN)
+ snapshot.dump(os_helper.TESTFN)
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
# load() should recreate the attribute
- snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
+ snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
self.assertEqual(snapshot2.test_attr, "new")
def fork_child(self):
import io
import struct
from test import support
-from test.support import import_fresh_module
+from test.support.import_helper import import_fresh_module
import types
import unittest
import types
import pickle
from test import support
+from test.support import import_helper
import test.test_importlib.util
import unittest
with unittest.mock.patch('builtins.__import__', _import):
# Since loader.discover() can modify sys.path, restore it when done.
- with support.DirsOnSysPath():
+ with import_helper.DirsOnSysPath():
# Make sure to remove 'package' from sys.modules when done.
with test.test_importlib.util.uncache('package'):
suite = loader.discover('package')
with unittest.mock.patch('builtins.__import__', _import):
# Since loader.discover() can modify sys.path, restore it when done.
- with support.DirsOnSysPath():
+ with import_helper.DirsOnSysPath():
# Make sure to remove 'package' from sys.modules when done.
with test.test_importlib.util.uncache('package'):
with self.assertRaises(TypeError) as cm:
import sys
import textwrap
-from test import support
+from test.support import warnings_helper
import traceback
import unittest
class Test_OldTestResult(unittest.TestCase):
def assertOldResultWarning(self, test, failures):
- with support.check_warnings(("TestResult has no add.+ method,",
- RuntimeWarning)):
+ with warnings_helper.check_warnings(
+ ("TestResult has no add.+ method,", RuntimeWarning)):
result = OldResult()
test.run(result)
self.assertEqual(len(result.failures), failures)