import contextlib
import decimal
import errno
-import fnmatch
import fractions
import itertools
import locale
import uuid
import warnings
from test import support
-from test.support import import_helper
from test.support import os_helper
from test.support import socket_helper
from test.support import infinite_recursion
from test.support import warnings_helper
from platform import win32_is_iot
+from .utils import create_file
try:
import resource
import fcntl
except ImportError:
fcntl = None
-try:
- import _winapi
-except ImportError:
- _winapi = None
try:
import pwd
all_users = [u.pw_uid for u in pwd.getpwall()]
return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
-def create_file(filename, content=b'content'):
- with open(filename, "xb", 0) as fp:
- fp.write(content)
-
-
# bpo-41625: On AIX, splice() only works with a socket, not with a pipe.
requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
'on AIX, splice() only accepts sockets')
self.fail('No OSError raised')
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32ErrorTests(unittest.TestCase):
- def setUp(self):
- try:
- os.stat(os_helper.TESTFN)
- except FileNotFoundError:
- exists = False
- except OSError as exc:
- exists = True
- self.fail("file %s must not exist; os.stat failed with %s"
- % (os_helper.TESTFN, exc))
- else:
- self.fail("file %s must not exist" % os_helper.TESTFN)
-
- def test_rename(self):
- self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
-
- def test_remove(self):
- self.assertRaises(OSError, os.remove, os_helper.TESTFN)
-
- def test_chdir(self):
- self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
-
- def test_mkdir(self):
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
-
- with open(os_helper.TESTFN, "x") as f:
- self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
-
- def test_utime(self):
- self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
-
- def test_chmod(self):
- self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
-
-
@unittest.skipIf(support.is_wasi, "Cannot create invalid FD on WASI.")
class TestInvalidFD(unittest.TestCase):
singles = ["fchdir", "dup", "fstat", "fstatvfs", "tcgetpgrp", "ttyname"]
for fn in self.unicodefn:
os.stat(os.path.join(self.dir, fn))
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32KillTests(unittest.TestCase):
- def _kill(self, sig):
- # Start sys.executable as a subprocess and communicate from the
- # subprocess to the parent that the interpreter is ready. When it
- # becomes ready, send *sig* via os.kill to the subprocess and check
- # that the return code is equal to *sig*.
- import ctypes
- from ctypes import wintypes
- import msvcrt
-
- # Since we can't access the contents of the process' stdout until the
- # process has exited, use PeekNamedPipe to see what's inside stdout
- # without waiting. This is done so we can tell that the interpreter
- # is started and running at a point where it could handle a signal.
- PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
- PeekNamedPipe.restype = wintypes.BOOL
- PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
- ctypes.POINTER(ctypes.c_char), # stdout buf
- wintypes.DWORD, # Buffer size
- ctypes.POINTER(wintypes.DWORD), # bytes read
- ctypes.POINTER(wintypes.DWORD), # bytes avail
- ctypes.POINTER(wintypes.DWORD)) # bytes left
- msg = "running"
- proc = subprocess.Popen([sys.executable, "-c",
- "import sys;"
- "sys.stdout.write('{}');"
- "sys.stdout.flush();"
- "input()".format(msg)],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- stdin=subprocess.PIPE)
- self.addCleanup(proc.stdout.close)
- self.addCleanup(proc.stderr.close)
- self.addCleanup(proc.stdin.close)
-
- count, max = 0, 100
- while count < max and proc.poll() is None:
- # Create a string buffer to store the result of stdout from the pipe
- buf = ctypes.create_string_buffer(len(msg))
- # Obtain the text currently in proc.stdout
- # Bytes read/avail/left are left as NULL and unused
- rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
- buf, ctypes.sizeof(buf), None, None, None)
- self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
- if buf.value:
- self.assertEqual(msg, buf.value.decode())
- break
- time.sleep(0.1)
- count += 1
- else:
- self.fail("Did not receive communication from the subprocess")
-
- os.kill(proc.pid, sig)
- self.assertEqual(proc.wait(), sig)
-
- def test_kill_sigterm(self):
- # SIGTERM doesn't mean anything special, but make sure it works
- self._kill(signal.SIGTERM)
-
- def test_kill_int(self):
- # os.kill on Windows can take an int which gets set as the exit code
- self._kill(100)
-
- @unittest.skipIf(mmap is None, "requires mmap")
- def _kill_with_event(self, event, name):
- tagname = "test_os_%s" % uuid.uuid1()
- m = mmap.mmap(-1, 1, tagname)
- m[0] = 0
-
- # Run a script which has console control handling enabled.
- script = os.path.join(os.path.dirname(__file__),
- "win_console_handler.py")
- cmd = [sys.executable, script, tagname]
- proc = subprocess.Popen(cmd,
- creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
-
- with proc:
- # Let the interpreter startup before we send signals. See #3137.
- for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
- if proc.poll() is None:
- break
- else:
- # Forcefully kill the process if we weren't able to signal it.
- proc.kill()
- self.fail("Subprocess didn't finish initialization")
-
- os.kill(proc.pid, event)
-
- try:
- # proc.send_signal(event) could also be done here.
- # Allow time for the signal to be passed and the process to exit.
- proc.wait(timeout=support.SHORT_TIMEOUT)
- except subprocess.TimeoutExpired:
- # Forcefully kill the process if we weren't able to signal it.
- proc.kill()
- self.fail("subprocess did not stop on {}".format(name))
-
- @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
- @support.requires_subprocess()
- def test_CTRL_C_EVENT(self):
- from ctypes import wintypes
- import ctypes
-
- # Make a NULL value by creating a pointer with no argument.
- NULL = ctypes.POINTER(ctypes.c_int)()
- SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
- SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
- wintypes.BOOL)
- SetConsoleCtrlHandler.restype = wintypes.BOOL
-
- # Calling this with NULL and FALSE causes the calling process to
- # handle Ctrl+C, rather than ignore it. This property is inherited
- # by subprocesses.
- SetConsoleCtrlHandler(NULL, 0)
-
- self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
-
- @support.requires_subprocess()
- def test_CTRL_BREAK_EVENT(self):
- self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
-
-
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32ListdirTests(unittest.TestCase):
- """Test listdir on Windows."""
-
- def setUp(self):
- self.created_paths = []
- for i in range(2):
- dir_name = 'SUB%d' % i
- dir_path = os.path.join(os_helper.TESTFN, dir_name)
- file_name = 'FILE%d' % i
- file_path = os.path.join(os_helper.TESTFN, file_name)
- os.makedirs(dir_path)
- with open(file_path, 'w', encoding='utf-8') as f:
- f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
- self.created_paths.extend([dir_name, file_name])
- self.created_paths.sort()
-
- def tearDown(self):
- shutil.rmtree(os_helper.TESTFN)
-
- def test_listdir_no_extended_path(self):
- """Test when the path is not an "extended" path."""
- # unicode
- self.assertEqual(
- sorted(os.listdir(os_helper.TESTFN)),
- self.created_paths)
-
- # bytes
- self.assertEqual(
- sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
- [os.fsencode(path) for path in self.created_paths])
-
- def test_listdir_extended_path(self):
- """Test when the path starts with '\\\\?\\'."""
- # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
- # unicode
- path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
- self.assertEqual(
- sorted(os.listdir(path)),
- self.created_paths)
-
- # bytes
- path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
- self.assertEqual(
- sorted(os.listdir(path)),
- [os.fsencode(path) for path in self.created_paths])
-
-
-@unittest.skipUnless(os.name == "nt", "NT specific tests")
-class Win32ListdriveTests(unittest.TestCase):
- """Test listdrive, listmounts and listvolume on Windows."""
-
- def setUp(self):
- # Get drives and volumes from fsutil
- out = subprocess.check_output(
- ["fsutil.exe", "volume", "list"],
- cwd=os.path.join(os.getenv("SystemRoot", "\\Windows"), "System32"),
- encoding="mbcs",
- errors="ignore",
- )
- lines = out.splitlines()
- self.known_volumes = {l for l in lines if l.startswith('\\\\?\\')}
- self.known_drives = {l for l in lines if l[1:] == ':\\'}
- self.known_mounts = {l for l in lines if l[1:3] == ':\\'}
-
- def test_listdrives(self):
- drives = os.listdrives()
- self.assertIsInstance(drives, list)
- self.assertSetEqual(
- self.known_drives,
- self.known_drives & set(drives),
- )
-
- def test_listvolumes(self):
- volumes = os.listvolumes()
- self.assertIsInstance(volumes, list)
- self.assertSetEqual(
- self.known_volumes,
- self.known_volumes & set(volumes),
- )
-
- def test_listmounts(self):
- for volume in os.listvolumes():
- try:
- mounts = os.listmounts(volume)
- except OSError as ex:
- if support.verbose:
- print("Skipping", volume, "because of", ex)
- else:
- self.assertIsInstance(mounts, list)
- self.assertSetEqual(
- set(mounts),
- self.known_mounts & set(mounts),
- )
-
@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
class ReadlinkTests(unittest.TestCase):
self.assertIsInstance(path, bytes)
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-@os_helper.skip_unless_symlink
-class Win32SymlinkTests(unittest.TestCase):
- filelink = 'filelinktest'
- filelink_target = os.path.abspath(__file__)
- dirlink = 'dirlinktest'
- dirlink_target = os.path.dirname(filelink_target)
- missing_link = 'missing link'
-
- def setUp(self):
- assert os.path.exists(self.dirlink_target)
- assert os.path.exists(self.filelink_target)
- assert not os.path.exists(self.dirlink)
- assert not os.path.exists(self.filelink)
- assert not os.path.exists(self.missing_link)
-
- def tearDown(self):
- if os.path.exists(self.filelink):
- os.remove(self.filelink)
- if os.path.exists(self.dirlink):
- os.rmdir(self.dirlink)
- if os.path.lexists(self.missing_link):
- os.remove(self.missing_link)
-
- def test_directory_link(self):
- os.symlink(self.dirlink_target, self.dirlink)
- self.assertTrue(os.path.exists(self.dirlink))
- self.assertTrue(os.path.isdir(self.dirlink))
- self.assertTrue(os.path.islink(self.dirlink))
- self.check_stat(self.dirlink, self.dirlink_target)
-
- def test_file_link(self):
- os.symlink(self.filelink_target, self.filelink)
- self.assertTrue(os.path.exists(self.filelink))
- self.assertTrue(os.path.isfile(self.filelink))
- self.assertTrue(os.path.islink(self.filelink))
- self.check_stat(self.filelink, self.filelink_target)
-
- def _create_missing_dir_link(self):
- 'Create a "directory" link to a non-existent target'
- linkname = self.missing_link
- if os.path.lexists(linkname):
- os.remove(linkname)
- target = r'c:\\target does not exist.29r3c740'
- assert not os.path.exists(target)
- target_is_dir = True
- os.symlink(target, linkname, target_is_dir)
-
- def test_remove_directory_link_to_missing_target(self):
- self._create_missing_dir_link()
- # For compatibility with Unix, os.remove will check the
- # directory status and call RemoveDirectory if the symlink
- # was created with target_is_dir==True.
- os.remove(self.missing_link)
-
- def test_isdir_on_directory_link_to_missing_target(self):
- self._create_missing_dir_link()
- self.assertFalse(os.path.isdir(self.missing_link))
-
- def test_rmdir_on_directory_link_to_missing_target(self):
- self._create_missing_dir_link()
- os.rmdir(self.missing_link)
-
- def check_stat(self, link, target):
- self.assertEqual(os.stat(link), os.stat(target))
- self.assertNotEqual(os.lstat(link), os.stat(link))
-
- bytes_link = os.fsencode(link)
- self.assertEqual(os.stat(bytes_link), os.stat(target))
- self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
-
- def test_12084(self):
- level1 = os.path.abspath(os_helper.TESTFN)
- level2 = os.path.join(level1, "level2")
- level3 = os.path.join(level2, "level3")
- self.addCleanup(os_helper.rmtree, level1)
-
- os.mkdir(level1)
- os.mkdir(level2)
- os.mkdir(level3)
-
- file1 = os.path.abspath(os.path.join(level1, "file1"))
- create_file(file1)
-
- orig_dir = os.getcwd()
- try:
- os.chdir(level2)
- link = os.path.join(level2, "link")
- os.symlink(os.path.relpath(file1), "link")
- self.assertIn("link", os.listdir(os.getcwd()))
-
- # Check os.stat calls from the same dir as the link
- self.assertEqual(os.stat(file1), os.stat("link"))
-
- # Check os.stat calls from a dir below the link
- os.chdir(level1)
- self.assertEqual(os.stat(file1),
- os.stat(os.path.relpath(link)))
-
- # Check os.stat calls from a dir above the link
- os.chdir(level3)
- self.assertEqual(os.stat(file1),
- os.stat(os.path.relpath(link)))
- finally:
- os.chdir(orig_dir)
-
- @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
- and os.path.exists(r'C:\ProgramData'),
- 'Test directories not found')
- def test_29248(self):
- # os.symlink() calls CreateSymbolicLink, which creates
- # the reparse data buffer with the print name stored
- # first, so the offset is always 0. CreateSymbolicLink
- # stores the "PrintName" DOS path (e.g. "C:\") first,
- # with an offset of 0, followed by the "SubstituteName"
- # NT path (e.g. "\??\C:\"). The "All Users" link, on
- # the other hand, seems to have been created manually
- # with an inverted order.
- target = os.readlink(r'C:\Users\All Users')
- self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
-
- def test_buffer_overflow(self):
- # Older versions would have a buffer overflow when detecting
- # whether a link source was a directory. This test ensures we
- # no longer crash, but does not otherwise validate the behavior
- segment = 'X' * 27
- path = os.path.join(*[segment] * 10)
- test_cases = [
- # overflow with absolute src
- ('\\' + path, segment),
- # overflow dest with relative src
- (segment, path),
- # overflow when joining src
- (path[:180], path[:180]),
- ]
- for src, dest in test_cases:
- try:
- os.symlink(src, dest)
- except FileNotFoundError:
- pass
- else:
- try:
- os.remove(dest)
- except OSError:
- pass
- # Also test with bytes, since that is a separate code path.
- try:
- os.symlink(os.fsencode(src), os.fsencode(dest))
- except FileNotFoundError:
- pass
- else:
- try:
- os.remove(dest)
- except OSError:
- pass
-
- def test_appexeclink(self):
- root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
- if not os.path.isdir(root):
- self.skipTest("test requires a WindowsApps directory")
-
- aliases = [os.path.join(root, a)
- for a in fnmatch.filter(os.listdir(root), '*.exe')]
-
- for alias in aliases:
- if support.verbose:
- print()
- print("Testing with", alias)
- st = os.lstat(alias)
- self.assertEqual(st, os.stat(alias))
- self.assertFalse(stat.S_ISLNK(st.st_mode))
- self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
- self.assertTrue(os.path.isfile(alias))
- # testing the first one we see is sufficient
- break
- else:
- self.skipTest("test requires an app execution alias")
-
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32JunctionTests(unittest.TestCase):
- junction = 'junctiontest'
- junction_target = os.path.dirname(os.path.abspath(__file__))
-
- def setUp(self):
- assert os.path.exists(self.junction_target)
- assert not os.path.lexists(self.junction)
-
- def tearDown(self):
- if os.path.lexists(self.junction):
- os.unlink(self.junction)
-
- def test_create_junction(self):
- _winapi.CreateJunction(self.junction_target, self.junction)
- self.assertTrue(os.path.lexists(self.junction))
- self.assertTrue(os.path.exists(self.junction))
- self.assertTrue(os.path.isdir(self.junction))
- self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
- self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
-
- # bpo-37834: Junctions are not recognized as links.
- self.assertFalse(os.path.islink(self.junction))
- self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
- os.path.normcase(os.readlink(self.junction)))
-
- def test_unlink_removes_junction(self):
- _winapi.CreateJunction(self.junction_target, self.junction)
- self.assertTrue(os.path.exists(self.junction))
- self.assertTrue(os.path.lexists(self.junction))
-
- os.unlink(self.junction)
- self.assertFalse(os.path.exists(self.junction))
-
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32NtTests(unittest.TestCase):
- def test_getfinalpathname_handles(self):
- nt = import_helper.import_module('nt')
- ctypes = import_helper.import_module('ctypes')
- # Ruff false positive -- it thinks we're redefining `ctypes` here
- import ctypes.wintypes # noqa: F811
-
- kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
- kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
-
- kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
- kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
- ctypes.wintypes.LPDWORD)
-
- # This is a pseudo-handle that doesn't need to be closed
- hproc = kernel.GetCurrentProcess()
-
- handle_count = ctypes.wintypes.DWORD()
- ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
- self.assertEqual(1, ok)
-
- before_count = handle_count.value
-
- # The first two test the error path, __file__ tests the success path
- filenames = [
- r'\\?\C:',
- r'\\?\NUL',
- r'\\?\CONIN',
- __file__,
- ]
-
- for _ in range(10):
- for name in filenames:
- try:
- nt._getfinalpathname(name)
- except Exception:
- # Failure is expected
- pass
- try:
- os.stat(name)
- except Exception:
- pass
-
- ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
- self.assertEqual(1, ok)
-
- handle_delta = handle_count.value - before_count
-
- self.assertEqual(0, handle_delta)
-
- @support.requires_subprocess()
- def test_stat_unlink_race(self):
- # bpo-46785: the implementation of os.stat() falls back to reading
- # the parent directory if CreateFileW() fails with a permission
- # error. If reading the parent directory fails because the file or
- # directory are subsequently unlinked, or because the volume or
- # share are no longer available, then the original permission error
- # should not be restored.
- filename = os_helper.TESTFN
- self.addCleanup(os_helper.unlink, filename)
- deadline = time.time() + 5
- command = textwrap.dedent("""\
- import os
- import sys
- import time
-
- filename = sys.argv[1]
- deadline = float(sys.argv[2])
-
- while time.time() < deadline:
- try:
- with open(filename, "w") as f:
- pass
- except OSError:
- pass
- try:
- os.remove(filename)
- except OSError:
- pass
- """)
-
- with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc:
- while time.time() < deadline:
- try:
- os.stat(filename)
- except FileNotFoundError as e:
- assert e.winerror == 2 # ERROR_FILE_NOT_FOUND
- try:
- proc.wait(1)
- except subprocess.TimeoutExpired:
- proc.terminate()
-
- @support.requires_subprocess()
- def test_stat_inaccessible_file(self):
- filename = os_helper.TESTFN
- ICACLS = os.path.expandvars(r"%SystemRoot%\System32\icacls.exe")
-
- with open(filename, "wb") as f:
- f.write(b'Test data')
-
- stat1 = os.stat(filename)
-
- try:
- # Remove all permissions from the file
- subprocess.check_output([ICACLS, filename, "/inheritance:r"],
- stderr=subprocess.STDOUT)
- except subprocess.CalledProcessError as ex:
- if support.verbose:
- print(ICACLS, filename, "/inheritance:r", "failed.")
- print(ex.stdout.decode("oem", "replace").rstrip())
- try:
- os.unlink(filename)
- except OSError:
- pass
- self.skipTest("Unable to create inaccessible file")
-
- def cleanup():
- # Give delete permission to the owner (us)
- subprocess.check_output([ICACLS, filename, "/grant", "*WD:(D)"],
- stderr=subprocess.STDOUT)
- os.unlink(filename)
-
- self.addCleanup(cleanup)
-
- if support.verbose:
- print("File:", filename)
- print("stat with access:", stat1)
-
- # First test - we shouldn't raise here, because we still have access to
- # the directory and can extract enough information from its metadata.
- stat2 = os.stat(filename)
-
- if support.verbose:
- print(" without access:", stat2)
-
- # We may not get st_dev/st_ino, so ensure those are 0 or match
- self.assertIn(stat2.st_dev, (0, stat1.st_dev))
- self.assertIn(stat2.st_ino, (0, stat1.st_ino))
-
- # st_mode and st_size should match (for a normal file, at least)
- self.assertEqual(stat1.st_mode, stat2.st_mode)
- self.assertEqual(stat1.st_size, stat2.st_size)
-
- # st_ctime and st_mtime should be the same
- self.assertEqual(stat1.st_ctime, stat2.st_ctime)
- self.assertEqual(stat1.st_mtime, stat2.st_mtime)
-
- # st_atime should be the same or later
- self.assertGreaterEqual(stat1.st_atime, stat2.st_atime)
-
-
@os_helper.skip_unless_symlink
class NonLocalSymlinkTests(unittest.TestCase):
--- /dev/null
+import sys
+import unittest
+
+if sys.platform != "win32":
+ raise unittest.SkipTest("Win32 specific tests")
+
+import _winapi
+import fnmatch
+import mmap
+import os
+import shutil
+import signal
+import stat
+import subprocess
+import textwrap
+import time
+import uuid
+from test import support
+from test.support import import_helper
+from test.support import os_helper
+from .utils import create_file
+
+
+class Win32ErrorTests(unittest.TestCase):
+ def setUp(self):
+ try:
+ os.stat(os_helper.TESTFN)
+ except FileNotFoundError:
+ exists = False
+ except OSError as exc:
+ exists = True
+ self.fail("file %s must not exist; os.stat failed with %s"
+ % (os_helper.TESTFN, exc))
+ else:
+ self.fail("file %s must not exist" % os_helper.TESTFN)
+
+ def test_rename(self):
+ self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
+
+ def test_remove(self):
+ self.assertRaises(OSError, os.remove, os_helper.TESTFN)
+
+ def test_chdir(self):
+ self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
+
+ def test_mkdir(self):
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
+
+ with open(os_helper.TESTFN, "x") as f:
+ self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
+
+ def test_utime(self):
+ self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
+
+ def test_chmod(self):
+ self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
+
+
+class Win32KillTests(unittest.TestCase):
+ def _kill(self, sig):
+ # Start sys.executable as a subprocess and communicate from the
+ # subprocess to the parent that the interpreter is ready. When it
+ # becomes ready, send *sig* via os.kill to the subprocess and check
+ # that the return code is equal to *sig*.
+ import ctypes
+ from ctypes import wintypes
+ import msvcrt
+
+ # Since we can't access the contents of the process' stdout until the
+ # process has exited, use PeekNamedPipe to see what's inside stdout
+ # without waiting. This is done so we can tell that the interpreter
+ # is started and running at a point where it could handle a signal.
+ PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
+ PeekNamedPipe.restype = wintypes.BOOL
+ PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
+ ctypes.POINTER(ctypes.c_char), # stdout buf
+ wintypes.DWORD, # Buffer size
+ ctypes.POINTER(wintypes.DWORD), # bytes read
+ ctypes.POINTER(wintypes.DWORD), # bytes avail
+ ctypes.POINTER(wintypes.DWORD)) # bytes left
+ msg = "running"
+ proc = subprocess.Popen([sys.executable, "-c",
+ "import sys;"
+ "sys.stdout.write('{}');"
+ "sys.stdout.flush();"
+ "input()".format(msg)],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ stdin=subprocess.PIPE)
+ self.addCleanup(proc.stdout.close)
+ self.addCleanup(proc.stderr.close)
+ self.addCleanup(proc.stdin.close)
+
+ count, max = 0, 100
+ while count < max and proc.poll() is None:
+ # Create a string buffer to store the result of stdout from the pipe
+ buf = ctypes.create_string_buffer(len(msg))
+ # Obtain the text currently in proc.stdout
+ # Bytes read/avail/left are left as NULL and unused
+ rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
+ buf, ctypes.sizeof(buf), None, None, None)
+ self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
+ if buf.value:
+ self.assertEqual(msg, buf.value.decode())
+ break
+ time.sleep(0.1)
+ count += 1
+ else:
+ self.fail("Did not receive communication from the subprocess")
+
+ os.kill(proc.pid, sig)
+ self.assertEqual(proc.wait(), sig)
+
+ def test_kill_sigterm(self):
+ # SIGTERM doesn't mean anything special, but make sure it works
+ self._kill(signal.SIGTERM)
+
+ def test_kill_int(self):
+ # os.kill on Windows can take an int which gets set as the exit code
+ self._kill(100)
+
+ @unittest.skipIf(mmap is None, "requires mmap")
+ def _kill_with_event(self, event, name):
+ tagname = "test_os_%s" % uuid.uuid1()
+ m = mmap.mmap(-1, 1, tagname)
+ m[0] = 0
+
+ # Run a script which has console control handling enabled.
+ script = os.path.join(os.path.dirname(__file__),
+ "win_console_handler.py")
+ cmd = [sys.executable, script, tagname]
+ proc = subprocess.Popen(cmd,
+ creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
+
+ with proc:
+ # Let the interpreter startup before we send signals. See #3137.
+ for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+ if proc.poll() is None:
+ break
+ else:
+ # Forcefully kill the process if we weren't able to signal it.
+ proc.kill()
+ self.fail("Subprocess didn't finish initialization")
+
+ os.kill(proc.pid, event)
+
+ try:
+ # proc.send_signal(event) could also be done here.
+ # Allow time for the signal to be passed and the process to exit.
+ proc.wait(timeout=support.SHORT_TIMEOUT)
+ except subprocess.TimeoutExpired:
+ # Forcefully kill the process if we weren't able to signal it.
+ proc.kill()
+ self.fail("subprocess did not stop on {}".format(name))
+
+ @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
+ @support.requires_subprocess()
+ def test_CTRL_C_EVENT(self):
+ from ctypes import wintypes
+ import ctypes
+
+ # Make a NULL value by creating a pointer with no argument.
+ NULL = ctypes.POINTER(ctypes.c_int)()
+ SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
+ SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
+ wintypes.BOOL)
+ SetConsoleCtrlHandler.restype = wintypes.BOOL
+
+ # Calling this with NULL and FALSE causes the calling process to
+ # handle Ctrl+C, rather than ignore it. This property is inherited
+ # by subprocesses.
+ SetConsoleCtrlHandler(NULL, 0)
+
+ self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
+
+ @support.requires_subprocess()
+ def test_CTRL_BREAK_EVENT(self):
+ self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
+
+
+class Win32ListdirTests(unittest.TestCase):
+ """Test listdir on Windows."""
+
+ def setUp(self):
+ self.created_paths = []
+ for i in range(2):
+ dir_name = 'SUB%d' % i
+ dir_path = os.path.join(os_helper.TESTFN, dir_name)
+ file_name = 'FILE%d' % i
+ file_path = os.path.join(os_helper.TESTFN, file_name)
+ os.makedirs(dir_path)
+ with open(file_path, 'w', encoding='utf-8') as f:
+ f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
+ self.created_paths.extend([dir_name, file_name])
+ self.created_paths.sort()
+
+ def tearDown(self):
+ shutil.rmtree(os_helper.TESTFN)
+
+ def test_listdir_no_extended_path(self):
+ """Test when the path is not an "extended" path."""
+ # unicode
+ self.assertEqual(
+ sorted(os.listdir(os_helper.TESTFN)),
+ self.created_paths)
+
+ # bytes
+ self.assertEqual(
+ sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
+ [os.fsencode(path) for path in self.created_paths])
+
+ def test_listdir_extended_path(self):
+ """Test when the path starts with '\\\\?\\'."""
+ # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
+ # unicode
+ path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
+ self.assertEqual(
+ sorted(os.listdir(path)),
+ self.created_paths)
+
+ # bytes
+ path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
+ self.assertEqual(
+ sorted(os.listdir(path)),
+ [os.fsencode(path) for path in self.created_paths])
+
+
+@unittest.skipUnless(os.name == "nt", "NT specific tests")
+class Win32ListdriveTests(unittest.TestCase):
+ """Test listdrive, listmounts and listvolume on Windows."""
+
+ def setUp(self):
+ # Get drives and volumes from fsutil
+ out = subprocess.check_output(
+ ["fsutil.exe", "volume", "list"],
+ cwd=os.path.join(os.getenv("SystemRoot", "\\Windows"), "System32"),
+ encoding="mbcs",
+ errors="ignore",
+ )
+ lines = out.splitlines()
+ self.known_volumes = {l for l in lines if l.startswith('\\\\?\\')}
+ self.known_drives = {l for l in lines if l[1:] == ':\\'}
+ self.known_mounts = {l for l in lines if l[1:3] == ':\\'}
+
+ def test_listdrives(self):
+ drives = os.listdrives()
+ self.assertIsInstance(drives, list)
+ self.assertSetEqual(
+ self.known_drives,
+ self.known_drives & set(drives),
+ )
+
+ def test_listvolumes(self):
+ volumes = os.listvolumes()
+ self.assertIsInstance(volumes, list)
+ self.assertSetEqual(
+ self.known_volumes,
+ self.known_volumes & set(volumes),
+ )
+
+ def test_listmounts(self):
+ for volume in os.listvolumes():
+ try:
+ mounts = os.listmounts(volume)
+ except OSError as ex:
+ if support.verbose:
+ print("Skipping", volume, "because of", ex)
+ else:
+ self.assertIsInstance(mounts, list)
+ self.assertSetEqual(
+ set(mounts),
+ self.known_mounts & set(mounts),
+ )
+
+
+@os_helper.skip_unless_symlink
+class Win32SymlinkTests(unittest.TestCase):
+ filelink = 'filelinktest'
+ filelink_target = os.path.abspath(__file__)
+ dirlink = 'dirlinktest'
+ dirlink_target = os.path.dirname(filelink_target)
+ missing_link = 'missing link'
+
+ def setUp(self):
+ assert os.path.exists(self.dirlink_target)
+ assert os.path.exists(self.filelink_target)
+ assert not os.path.exists(self.dirlink)
+ assert not os.path.exists(self.filelink)
+ assert not os.path.exists(self.missing_link)
+
+ def tearDown(self):
+ if os.path.exists(self.filelink):
+ os.remove(self.filelink)
+ if os.path.exists(self.dirlink):
+ os.rmdir(self.dirlink)
+ if os.path.lexists(self.missing_link):
+ os.remove(self.missing_link)
+
+ def test_directory_link(self):
+ os.symlink(self.dirlink_target, self.dirlink)
+ self.assertTrue(os.path.exists(self.dirlink))
+ self.assertTrue(os.path.isdir(self.dirlink))
+ self.assertTrue(os.path.islink(self.dirlink))
+ self.check_stat(self.dirlink, self.dirlink_target)
+
+ def test_file_link(self):
+ os.symlink(self.filelink_target, self.filelink)
+ self.assertTrue(os.path.exists(self.filelink))
+ self.assertTrue(os.path.isfile(self.filelink))
+ self.assertTrue(os.path.islink(self.filelink))
+ self.check_stat(self.filelink, self.filelink_target)
+
+ def _create_missing_dir_link(self):
+ 'Create a "directory" link to a non-existent target'
+ linkname = self.missing_link
+ if os.path.lexists(linkname):
+ os.remove(linkname)
+ target = r'c:\\target does not exist.29r3c740'
+ assert not os.path.exists(target)
+ target_is_dir = True
+ os.symlink(target, linkname, target_is_dir)
+
+ def test_remove_directory_link_to_missing_target(self):
+ self._create_missing_dir_link()
+ # For compatibility with Unix, os.remove will check the
+ # directory status and call RemoveDirectory if the symlink
+ # was created with target_is_dir==True.
+ os.remove(self.missing_link)
+
+ def test_isdir_on_directory_link_to_missing_target(self):
+ self._create_missing_dir_link()
+ self.assertFalse(os.path.isdir(self.missing_link))
+
+ def test_rmdir_on_directory_link_to_missing_target(self):
+ self._create_missing_dir_link()
+ os.rmdir(self.missing_link)
+
+ def check_stat(self, link, target):
+ self.assertEqual(os.stat(link), os.stat(target))
+ self.assertNotEqual(os.lstat(link), os.stat(link))
+
+ bytes_link = os.fsencode(link)
+ self.assertEqual(os.stat(bytes_link), os.stat(target))
+ self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
+
+ def test_12084(self):
+ level1 = os.path.abspath(os_helper.TESTFN)
+ level2 = os.path.join(level1, "level2")
+ level3 = os.path.join(level2, "level3")
+ self.addCleanup(os_helper.rmtree, level1)
+
+ os.mkdir(level1)
+ os.mkdir(level2)
+ os.mkdir(level3)
+
+ file1 = os.path.abspath(os.path.join(level1, "file1"))
+ create_file(file1)
+
+ orig_dir = os.getcwd()
+ try:
+ os.chdir(level2)
+ link = os.path.join(level2, "link")
+ os.symlink(os.path.relpath(file1), "link")
+ self.assertIn("link", os.listdir(os.getcwd()))
+
+ # Check os.stat calls from the same dir as the link
+ self.assertEqual(os.stat(file1), os.stat("link"))
+
+ # Check os.stat calls from a dir below the link
+ os.chdir(level1)
+ self.assertEqual(os.stat(file1),
+ os.stat(os.path.relpath(link)))
+
+ # Check os.stat calls from a dir above the link
+ os.chdir(level3)
+ self.assertEqual(os.stat(file1),
+ os.stat(os.path.relpath(link)))
+ finally:
+ os.chdir(orig_dir)
+
+ @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
+ and os.path.exists(r'C:\ProgramData'),
+ 'Test directories not found')
+ def test_29248(self):
+ # os.symlink() calls CreateSymbolicLink, which creates
+ # the reparse data buffer with the print name stored
+ # first, so the offset is always 0. CreateSymbolicLink
+ # stores the "PrintName" DOS path (e.g. "C:\") first,
+ # with an offset of 0, followed by the "SubstituteName"
+ # NT path (e.g. "\??\C:\"). The "All Users" link, on
+ # the other hand, seems to have been created manually
+ # with an inverted order.
+ target = os.readlink(r'C:\Users\All Users')
+ self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
+
+ def test_buffer_overflow(self):
+ # Older versions would have a buffer overflow when detecting
+ # whether a link source was a directory. This test ensures we
+ # no longer crash, but does not otherwise validate the behavior
+ segment = 'X' * 27
+ path = os.path.join(*[segment] * 10)
+ test_cases = [
+ # overflow with absolute src
+ ('\\' + path, segment),
+ # overflow dest with relative src
+ (segment, path),
+ # overflow when joining src
+ (path[:180], path[:180]),
+ ]
+ for src, dest in test_cases:
+ try:
+ os.symlink(src, dest)
+ except FileNotFoundError:
+ pass
+ else:
+ try:
+ os.remove(dest)
+ except OSError:
+ pass
+ # Also test with bytes, since that is a separate code path.
+ try:
+ os.symlink(os.fsencode(src), os.fsencode(dest))
+ except FileNotFoundError:
+ pass
+ else:
+ try:
+ os.remove(dest)
+ except OSError:
+ pass
+
+ def test_appexeclink(self):
+ root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
+ if not os.path.isdir(root):
+ self.skipTest("test requires a WindowsApps directory")
+
+ aliases = [os.path.join(root, a)
+ for a in fnmatch.filter(os.listdir(root), '*.exe')]
+
+ for alias in aliases:
+ if support.verbose:
+ print()
+ print("Testing with", alias)
+ st = os.lstat(alias)
+ self.assertEqual(st, os.stat(alias))
+ self.assertFalse(stat.S_ISLNK(st.st_mode))
+ self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
+ self.assertTrue(os.path.isfile(alias))
+ # testing the first one we see is sufficient
+ break
+ else:
+ self.skipTest("test requires an app execution alias")
+
+
+class Win32JunctionTests(unittest.TestCase):
+ junction = 'junctiontest'
+ junction_target = os.path.dirname(os.path.abspath(__file__))
+
+ def setUp(self):
+ assert os.path.exists(self.junction_target)
+ assert not os.path.lexists(self.junction)
+
+ def tearDown(self):
+ if os.path.lexists(self.junction):
+ os.unlink(self.junction)
+
+ def test_create_junction(self):
+ _winapi.CreateJunction(self.junction_target, self.junction)
+ self.assertTrue(os.path.lexists(self.junction))
+ self.assertTrue(os.path.exists(self.junction))
+ self.assertTrue(os.path.isdir(self.junction))
+ self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
+ self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
+
+ # bpo-37834: Junctions are not recognized as links.
+ self.assertFalse(os.path.islink(self.junction))
+ self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
+ os.path.normcase(os.readlink(self.junction)))
+
+ def test_unlink_removes_junction(self):
+ _winapi.CreateJunction(self.junction_target, self.junction)
+ self.assertTrue(os.path.exists(self.junction))
+ self.assertTrue(os.path.lexists(self.junction))
+
+ os.unlink(self.junction)
+ self.assertFalse(os.path.exists(self.junction))
+
+
+class Win32NtTests(unittest.TestCase):
+ def test_getfinalpathname_handles(self):
+ nt = import_helper.import_module('nt')
+ ctypes = import_helper.import_module('ctypes')
+ # Ruff false positive -- it thinks we're redefining `ctypes` here
+ import ctypes.wintypes # noqa: F811
+
+ kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
+ kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
+
+ kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
+ kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
+ ctypes.wintypes.LPDWORD)
+
+ # This is a pseudo-handle that doesn't need to be closed
+ hproc = kernel.GetCurrentProcess()
+
+ handle_count = ctypes.wintypes.DWORD()
+ ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
+ self.assertEqual(1, ok)
+
+ before_count = handle_count.value
+
+ # The first two test the error path, __file__ tests the success path
+ filenames = [
+ r'\\?\C:',
+ r'\\?\NUL',
+ r'\\?\CONIN',
+ __file__,
+ ]
+
+ for _ in range(10):
+ for name in filenames:
+ try:
+ nt._getfinalpathname(name)
+ except Exception:
+ # Failure is expected
+ pass
+ try:
+ os.stat(name)
+ except Exception:
+ pass
+
+ ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
+ self.assertEqual(1, ok)
+
+ handle_delta = handle_count.value - before_count
+
+ self.assertEqual(0, handle_delta)
+
+ @support.requires_subprocess()
+ def test_stat_unlink_race(self):
+ # bpo-46785: the implementation of os.stat() falls back to reading
+ # the parent directory if CreateFileW() fails with a permission
+ # error. If reading the parent directory fails because the file or
+ # directory are subsequently unlinked, or because the volume or
+ # share are no longer available, then the original permission error
+ # should not be restored.
+ filename = os_helper.TESTFN
+ self.addCleanup(os_helper.unlink, filename)
+ deadline = time.time() + 5
+ command = textwrap.dedent("""\
+ import os
+ import sys
+ import time
+
+ filename = sys.argv[1]
+ deadline = float(sys.argv[2])
+
+ while time.time() < deadline:
+ try:
+ with open(filename, "w") as f:
+ pass
+ except OSError:
+ pass
+ try:
+ os.remove(filename)
+ except OSError:
+ pass
+ """)
+
+ with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc:
+ while time.time() < deadline:
+ try:
+ os.stat(filename)
+ except FileNotFoundError as e:
+ assert e.winerror == 2 # ERROR_FILE_NOT_FOUND
+ try:
+ proc.wait(1)
+ except subprocess.TimeoutExpired:
+ proc.terminate()
+
+ @support.requires_subprocess()
+ def test_stat_inaccessible_file(self):
+ filename = os_helper.TESTFN
+ ICACLS = os.path.expandvars(r"%SystemRoot%\System32\icacls.exe")
+
+ with open(filename, "wb") as f:
+ f.write(b'Test data')
+
+ stat1 = os.stat(filename)
+
+ try:
+ # Remove all permissions from the file
+ subprocess.check_output([ICACLS, filename, "/inheritance:r"],
+ stderr=subprocess.STDOUT)
+ except subprocess.CalledProcessError as ex:
+ if support.verbose:
+ print(ICACLS, filename, "/inheritance:r", "failed.")
+ print(ex.stdout.decode("oem", "replace").rstrip())
+ try:
+ os.unlink(filename)
+ except OSError:
+ pass
+ self.skipTest("Unable to create inaccessible file")
+
+ def cleanup():
+ # Give delete permission to the owner (us)
+ subprocess.check_output([ICACLS, filename, "/grant", "*WD:(D)"],
+ stderr=subprocess.STDOUT)
+ os.unlink(filename)
+
+ self.addCleanup(cleanup)
+
+ if support.verbose:
+ print("File:", filename)
+ print("stat with access:", stat1)
+
+ # First test - we shouldn't raise here, because we still have access to
+ # the directory and can extract enough information from its metadata.
+ stat2 = os.stat(filename)
+
+ if support.verbose:
+ print(" without access:", stat2)
+
+ # We may not get st_dev/st_ino, so ensure those are 0 or match
+ self.assertIn(stat2.st_dev, (0, stat1.st_dev))
+ self.assertIn(stat2.st_ino, (0, stat1.st_ino))
+
+ # st_mode and st_size should match (for a normal file, at least)
+ self.assertEqual(stat1.st_mode, stat2.st_mode)
+ self.assertEqual(stat1.st_size, stat2.st_size)
+
+ # st_ctime and st_mtime should be the same
+ self.assertEqual(stat1.st_ctime, stat2.st_ctime)
+ self.assertEqual(stat1.st_mtime, stat2.st_mtime)
+
+ # st_atime should be the same or later
+ self.assertGreaterEqual(stat1.st_atime, stat2.st_atime)
+
+
+if __name__ == "__main__":
+ unittest.main()