]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-139322: Create test_os package (#139453)
authorVictor Stinner <vstinner@python.org>
Wed, 1 Oct 2025 14:42:45 +0000 (16:42 +0200)
committerGitHub <noreply@github.com>
Wed, 1 Oct 2025 14:42:45 +0000 (16:42 +0200)
* Move test_posix.py and test_os.py to Lib/test/test_os/.
* Split Windows specific test cases to a new test_windows.py file.

Lib/test/libregrtest/findtests.py
Lib/test/test_os/__init__.py [new file with mode: 0644]
Lib/test/test_os/test_os.py [moved from Lib/test/test_os.py with 88% similarity]
Lib/test/test_os/test_posix.py [moved from Lib/test/test_posix.py with 100% similarity]
Lib/test/test_os/test_windows.py [new file with mode: 0644]
Lib/test/test_os/utils.py [new file with mode: 0644]
Makefile.pre.in

index 79afaf9083ae596efd15c03e33a186c319de635c..6c0e50846a466bb342d2bbe54eda4d6c438e493d 100644 (file)
@@ -25,10 +25,11 @@ SPLITTESTDIRS: set[TestName] = {
     "test_gdb",
     "test_inspect",
     "test_io",
-    "test_pydoc",
     "test_multiprocessing_fork",
     "test_multiprocessing_forkserver",
     "test_multiprocessing_spawn",
+    "test_os",
+    "test_pydoc",
 }
 
 
diff --git a/Lib/test/test_os/__init__.py b/Lib/test/test_os/__init__.py
new file mode 100644 (file)
index 0000000..bc502ef
--- /dev/null
@@ -0,0 +1,6 @@
+import os.path
+from test.support import load_package_tests
+
+
+def load_tests(*args):
+    return load_package_tests(os.path.dirname(__file__), *args)
similarity index 88%
rename from Lib/test/test_os.py
rename to Lib/test/test_os/test_os.py
index 1180e27a7a53106db9bedff300efc45d38931d83..623d05235835c248d50ad283961e78c5b745994c 100644 (file)
@@ -7,7 +7,6 @@ import codecs
 import contextlib
 import decimal
 import errno
-import fnmatch
 import fractions
 import itertools
 import locale
@@ -31,12 +30,12 @@ import unittest
 import uuid
 import warnings
 from test import support
-from test.support import import_helper
 from test.support import os_helper
 from test.support import socket_helper
 from test.support import infinite_recursion
 from test.support import warnings_helper
 from platform import win32_is_iot
+from .utils import create_file
 
 try:
     import resource
@@ -46,10 +45,6 @@ try:
     import fcntl
 except ImportError:
     fcntl = None
-try:
-    import _winapi
-except ImportError:
-    _winapi = None
 try:
     import pwd
     all_users = [u.pw_uid for u in pwd.getpwall()]
@@ -93,11 +88,6 @@ def requires_os_func(name):
     return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
 
 
-def create_file(filename, content=b'content'):
-    with open(filename, "xb", 0) as fp:
-        fp.write(content)
-
-
 # bpo-41625: On AIX, splice() only works with a socket, not with a pipe.
 requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
                                        'on AIX, splice() only accepts sockets')
@@ -2466,42 +2456,6 @@ class ExecTests(unittest.TestCase):
             self.fail('No OSError raised')
 
 
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32ErrorTests(unittest.TestCase):
-    def setUp(self):
-        try:
-            os.stat(os_helper.TESTFN)
-        except FileNotFoundError:
-            exists = False
-        except OSError as exc:
-            exists = True
-            self.fail("file %s must not exist; os.stat failed with %s"
-                      % (os_helper.TESTFN, exc))
-        else:
-            self.fail("file %s must not exist" % os_helper.TESTFN)
-
-    def test_rename(self):
-        self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
-
-    def test_remove(self):
-        self.assertRaises(OSError, os.remove, os_helper.TESTFN)
-
-    def test_chdir(self):
-        self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
-
-    def test_mkdir(self):
-        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
-
-        with open(os_helper.TESTFN, "x") as f:
-            self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
-
-    def test_utime(self):
-        self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
-
-    def test_chmod(self):
-        self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
-
-
 @unittest.skipIf(support.is_wasi, "Cannot create invalid FD on WASI.")
 class TestInvalidFD(unittest.TestCase):
     singles = ["fchdir", "dup", "fstat", "fstatvfs", "tcgetpgrp", "ttyname"]
@@ -2836,224 +2790,6 @@ class Pep383Tests(unittest.TestCase):
         for fn in self.unicodefn:
             os.stat(os.path.join(self.dir, fn))
 
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32KillTests(unittest.TestCase):
-    def _kill(self, sig):
-        # Start sys.executable as a subprocess and communicate from the
-        # subprocess to the parent that the interpreter is ready. When it
-        # becomes ready, send *sig* via os.kill to the subprocess and check
-        # that the return code is equal to *sig*.
-        import ctypes
-        from ctypes import wintypes
-        import msvcrt
-
-        # Since we can't access the contents of the process' stdout until the
-        # process has exited, use PeekNamedPipe to see what's inside stdout
-        # without waiting. This is done so we can tell that the interpreter
-        # is started and running at a point where it could handle a signal.
-        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
-        PeekNamedPipe.restype = wintypes.BOOL
-        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
-                                  ctypes.POINTER(ctypes.c_char), # stdout buf
-                                  wintypes.DWORD, # Buffer size
-                                  ctypes.POINTER(wintypes.DWORD), # bytes read
-                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
-                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
-        msg = "running"
-        proc = subprocess.Popen([sys.executable, "-c",
-                                 "import sys;"
-                                 "sys.stdout.write('{}');"
-                                 "sys.stdout.flush();"
-                                 "input()".format(msg)],
-                                stdout=subprocess.PIPE,
-                                stderr=subprocess.PIPE,
-                                stdin=subprocess.PIPE)
-        self.addCleanup(proc.stdout.close)
-        self.addCleanup(proc.stderr.close)
-        self.addCleanup(proc.stdin.close)
-
-        count, max = 0, 100
-        while count < max and proc.poll() is None:
-            # Create a string buffer to store the result of stdout from the pipe
-            buf = ctypes.create_string_buffer(len(msg))
-            # Obtain the text currently in proc.stdout
-            # Bytes read/avail/left are left as NULL and unused
-            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
-                                 buf, ctypes.sizeof(buf), None, None, None)
-            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
-            if buf.value:
-                self.assertEqual(msg, buf.value.decode())
-                break
-            time.sleep(0.1)
-            count += 1
-        else:
-            self.fail("Did not receive communication from the subprocess")
-
-        os.kill(proc.pid, sig)
-        self.assertEqual(proc.wait(), sig)
-
-    def test_kill_sigterm(self):
-        # SIGTERM doesn't mean anything special, but make sure it works
-        self._kill(signal.SIGTERM)
-
-    def test_kill_int(self):
-        # os.kill on Windows can take an int which gets set as the exit code
-        self._kill(100)
-
-    @unittest.skipIf(mmap is None, "requires mmap")
-    def _kill_with_event(self, event, name):
-        tagname = "test_os_%s" % uuid.uuid1()
-        m = mmap.mmap(-1, 1, tagname)
-        m[0] = 0
-
-        # Run a script which has console control handling enabled.
-        script = os.path.join(os.path.dirname(__file__),
-                              "win_console_handler.py")
-        cmd = [sys.executable, script, tagname]
-        proc = subprocess.Popen(cmd,
-                                creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
-
-        with proc:
-            # Let the interpreter startup before we send signals. See #3137.
-            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
-                if proc.poll() is None:
-                    break
-            else:
-                # Forcefully kill the process if we weren't able to signal it.
-                proc.kill()
-                self.fail("Subprocess didn't finish initialization")
-
-            os.kill(proc.pid, event)
-
-            try:
-                # proc.send_signal(event) could also be done here.
-                # Allow time for the signal to be passed and the process to exit.
-                proc.wait(timeout=support.SHORT_TIMEOUT)
-            except subprocess.TimeoutExpired:
-                # Forcefully kill the process if we weren't able to signal it.
-                proc.kill()
-                self.fail("subprocess did not stop on {}".format(name))
-
-    @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
-    @support.requires_subprocess()
-    def test_CTRL_C_EVENT(self):
-        from ctypes import wintypes
-        import ctypes
-
-        # Make a NULL value by creating a pointer with no argument.
-        NULL = ctypes.POINTER(ctypes.c_int)()
-        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
-        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
-                                          wintypes.BOOL)
-        SetConsoleCtrlHandler.restype = wintypes.BOOL
-
-        # Calling this with NULL and FALSE causes the calling process to
-        # handle Ctrl+C, rather than ignore it. This property is inherited
-        # by subprocesses.
-        SetConsoleCtrlHandler(NULL, 0)
-
-        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
-
-    @support.requires_subprocess()
-    def test_CTRL_BREAK_EVENT(self):
-        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
-
-
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32ListdirTests(unittest.TestCase):
-    """Test listdir on Windows."""
-
-    def setUp(self):
-        self.created_paths = []
-        for i in range(2):
-            dir_name = 'SUB%d' % i
-            dir_path = os.path.join(os_helper.TESTFN, dir_name)
-            file_name = 'FILE%d' % i
-            file_path = os.path.join(os_helper.TESTFN, file_name)
-            os.makedirs(dir_path)
-            with open(file_path, 'w', encoding='utf-8') as f:
-                f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
-            self.created_paths.extend([dir_name, file_name])
-        self.created_paths.sort()
-
-    def tearDown(self):
-        shutil.rmtree(os_helper.TESTFN)
-
-    def test_listdir_no_extended_path(self):
-        """Test when the path is not an "extended" path."""
-        # unicode
-        self.assertEqual(
-                sorted(os.listdir(os_helper.TESTFN)),
-                self.created_paths)
-
-        # bytes
-        self.assertEqual(
-                sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
-                [os.fsencode(path) for path in self.created_paths])
-
-    def test_listdir_extended_path(self):
-        """Test when the path starts with '\\\\?\\'."""
-        # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
-        # unicode
-        path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
-        self.assertEqual(
-                sorted(os.listdir(path)),
-                self.created_paths)
-
-        # bytes
-        path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
-        self.assertEqual(
-                sorted(os.listdir(path)),
-                [os.fsencode(path) for path in self.created_paths])
-
-
-@unittest.skipUnless(os.name == "nt", "NT specific tests")
-class Win32ListdriveTests(unittest.TestCase):
-    """Test listdrive, listmounts and listvolume on Windows."""
-
-    def setUp(self):
-        # Get drives and volumes from fsutil
-        out = subprocess.check_output(
-            ["fsutil.exe", "volume", "list"],
-            cwd=os.path.join(os.getenv("SystemRoot", "\\Windows"), "System32"),
-            encoding="mbcs",
-            errors="ignore",
-        )
-        lines = out.splitlines()
-        self.known_volumes = {l for l in lines if l.startswith('\\\\?\\')}
-        self.known_drives = {l for l in lines if l[1:] == ':\\'}
-        self.known_mounts = {l for l in lines if l[1:3] == ':\\'}
-
-    def test_listdrives(self):
-        drives = os.listdrives()
-        self.assertIsInstance(drives, list)
-        self.assertSetEqual(
-            self.known_drives,
-            self.known_drives & set(drives),
-        )
-
-    def test_listvolumes(self):
-        volumes = os.listvolumes()
-        self.assertIsInstance(volumes, list)
-        self.assertSetEqual(
-            self.known_volumes,
-            self.known_volumes & set(volumes),
-        )
-
-    def test_listmounts(self):
-        for volume in os.listvolumes():
-            try:
-                mounts = os.listmounts(volume)
-            except OSError as ex:
-                if support.verbose:
-                    print("Skipping", volume, "because of", ex)
-            else:
-                self.assertIsInstance(mounts, list)
-                self.assertSetEqual(
-                    set(mounts),
-                    self.known_mounts & set(mounts),
-                )
-
 
 @unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
 class ReadlinkTests(unittest.TestCase):
@@ -3116,370 +2852,6 @@ class ReadlinkTests(unittest.TestCase):
         self.assertIsInstance(path, bytes)
 
 
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-@os_helper.skip_unless_symlink
-class Win32SymlinkTests(unittest.TestCase):
-    filelink = 'filelinktest'
-    filelink_target = os.path.abspath(__file__)
-    dirlink = 'dirlinktest'
-    dirlink_target = os.path.dirname(filelink_target)
-    missing_link = 'missing link'
-
-    def setUp(self):
-        assert os.path.exists(self.dirlink_target)
-        assert os.path.exists(self.filelink_target)
-        assert not os.path.exists(self.dirlink)
-        assert not os.path.exists(self.filelink)
-        assert not os.path.exists(self.missing_link)
-
-    def tearDown(self):
-        if os.path.exists(self.filelink):
-            os.remove(self.filelink)
-        if os.path.exists(self.dirlink):
-            os.rmdir(self.dirlink)
-        if os.path.lexists(self.missing_link):
-            os.remove(self.missing_link)
-
-    def test_directory_link(self):
-        os.symlink(self.dirlink_target, self.dirlink)
-        self.assertTrue(os.path.exists(self.dirlink))
-        self.assertTrue(os.path.isdir(self.dirlink))
-        self.assertTrue(os.path.islink(self.dirlink))
-        self.check_stat(self.dirlink, self.dirlink_target)
-
-    def test_file_link(self):
-        os.symlink(self.filelink_target, self.filelink)
-        self.assertTrue(os.path.exists(self.filelink))
-        self.assertTrue(os.path.isfile(self.filelink))
-        self.assertTrue(os.path.islink(self.filelink))
-        self.check_stat(self.filelink, self.filelink_target)
-
-    def _create_missing_dir_link(self):
-        'Create a "directory" link to a non-existent target'
-        linkname = self.missing_link
-        if os.path.lexists(linkname):
-            os.remove(linkname)
-        target = r'c:\\target does not exist.29r3c740'
-        assert not os.path.exists(target)
-        target_is_dir = True
-        os.symlink(target, linkname, target_is_dir)
-
-    def test_remove_directory_link_to_missing_target(self):
-        self._create_missing_dir_link()
-        # For compatibility with Unix, os.remove will check the
-        #  directory status and call RemoveDirectory if the symlink
-        #  was created with target_is_dir==True.
-        os.remove(self.missing_link)
-
-    def test_isdir_on_directory_link_to_missing_target(self):
-        self._create_missing_dir_link()
-        self.assertFalse(os.path.isdir(self.missing_link))
-
-    def test_rmdir_on_directory_link_to_missing_target(self):
-        self._create_missing_dir_link()
-        os.rmdir(self.missing_link)
-
-    def check_stat(self, link, target):
-        self.assertEqual(os.stat(link), os.stat(target))
-        self.assertNotEqual(os.lstat(link), os.stat(link))
-
-        bytes_link = os.fsencode(link)
-        self.assertEqual(os.stat(bytes_link), os.stat(target))
-        self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
-
-    def test_12084(self):
-        level1 = os.path.abspath(os_helper.TESTFN)
-        level2 = os.path.join(level1, "level2")
-        level3 = os.path.join(level2, "level3")
-        self.addCleanup(os_helper.rmtree, level1)
-
-        os.mkdir(level1)
-        os.mkdir(level2)
-        os.mkdir(level3)
-
-        file1 = os.path.abspath(os.path.join(level1, "file1"))
-        create_file(file1)
-
-        orig_dir = os.getcwd()
-        try:
-            os.chdir(level2)
-            link = os.path.join(level2, "link")
-            os.symlink(os.path.relpath(file1), "link")
-            self.assertIn("link", os.listdir(os.getcwd()))
-
-            # Check os.stat calls from the same dir as the link
-            self.assertEqual(os.stat(file1), os.stat("link"))
-
-            # Check os.stat calls from a dir below the link
-            os.chdir(level1)
-            self.assertEqual(os.stat(file1),
-                             os.stat(os.path.relpath(link)))
-
-            # Check os.stat calls from a dir above the link
-            os.chdir(level3)
-            self.assertEqual(os.stat(file1),
-                             os.stat(os.path.relpath(link)))
-        finally:
-            os.chdir(orig_dir)
-
-    @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
-                            and os.path.exists(r'C:\ProgramData'),
-                            'Test directories not found')
-    def test_29248(self):
-        # os.symlink() calls CreateSymbolicLink, which creates
-        # the reparse data buffer with the print name stored
-        # first, so the offset is always 0. CreateSymbolicLink
-        # stores the "PrintName" DOS path (e.g. "C:\") first,
-        # with an offset of 0, followed by the "SubstituteName"
-        # NT path (e.g. "\??\C:\"). The "All Users" link, on
-        # the other hand, seems to have been created manually
-        # with an inverted order.
-        target = os.readlink(r'C:\Users\All Users')
-        self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
-
-    def test_buffer_overflow(self):
-        # Older versions would have a buffer overflow when detecting
-        # whether a link source was a directory. This test ensures we
-        # no longer crash, but does not otherwise validate the behavior
-        segment = 'X' * 27
-        path = os.path.join(*[segment] * 10)
-        test_cases = [
-            # overflow with absolute src
-            ('\\' + path, segment),
-            # overflow dest with relative src
-            (segment, path),
-            # overflow when joining src
-            (path[:180], path[:180]),
-        ]
-        for src, dest in test_cases:
-            try:
-                os.symlink(src, dest)
-            except FileNotFoundError:
-                pass
-            else:
-                try:
-                    os.remove(dest)
-                except OSError:
-                    pass
-            # Also test with bytes, since that is a separate code path.
-            try:
-                os.symlink(os.fsencode(src), os.fsencode(dest))
-            except FileNotFoundError:
-                pass
-            else:
-                try:
-                    os.remove(dest)
-                except OSError:
-                    pass
-
-    def test_appexeclink(self):
-        root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
-        if not os.path.isdir(root):
-            self.skipTest("test requires a WindowsApps directory")
-
-        aliases = [os.path.join(root, a)
-                   for a in fnmatch.filter(os.listdir(root), '*.exe')]
-
-        for alias in aliases:
-            if support.verbose:
-                print()
-                print("Testing with", alias)
-            st = os.lstat(alias)
-            self.assertEqual(st, os.stat(alias))
-            self.assertFalse(stat.S_ISLNK(st.st_mode))
-            self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
-            self.assertTrue(os.path.isfile(alias))
-            # testing the first one we see is sufficient
-            break
-        else:
-            self.skipTest("test requires an app execution alias")
-
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32JunctionTests(unittest.TestCase):
-    junction = 'junctiontest'
-    junction_target = os.path.dirname(os.path.abspath(__file__))
-
-    def setUp(self):
-        assert os.path.exists(self.junction_target)
-        assert not os.path.lexists(self.junction)
-
-    def tearDown(self):
-        if os.path.lexists(self.junction):
-            os.unlink(self.junction)
-
-    def test_create_junction(self):
-        _winapi.CreateJunction(self.junction_target, self.junction)
-        self.assertTrue(os.path.lexists(self.junction))
-        self.assertTrue(os.path.exists(self.junction))
-        self.assertTrue(os.path.isdir(self.junction))
-        self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
-        self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
-
-        # bpo-37834: Junctions are not recognized as links.
-        self.assertFalse(os.path.islink(self.junction))
-        self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
-                         os.path.normcase(os.readlink(self.junction)))
-
-    def test_unlink_removes_junction(self):
-        _winapi.CreateJunction(self.junction_target, self.junction)
-        self.assertTrue(os.path.exists(self.junction))
-        self.assertTrue(os.path.lexists(self.junction))
-
-        os.unlink(self.junction)
-        self.assertFalse(os.path.exists(self.junction))
-
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32NtTests(unittest.TestCase):
-    def test_getfinalpathname_handles(self):
-        nt = import_helper.import_module('nt')
-        ctypes = import_helper.import_module('ctypes')
-        # Ruff false positive -- it thinks we're redefining `ctypes` here
-        import ctypes.wintypes  # noqa: F811
-
-        kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
-        kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
-
-        kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
-        kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
-                                                 ctypes.wintypes.LPDWORD)
-
-        # This is a pseudo-handle that doesn't need to be closed
-        hproc = kernel.GetCurrentProcess()
-
-        handle_count = ctypes.wintypes.DWORD()
-        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
-        self.assertEqual(1, ok)
-
-        before_count = handle_count.value
-
-        # The first two test the error path, __file__ tests the success path
-        filenames = [
-            r'\\?\C:',
-            r'\\?\NUL',
-            r'\\?\CONIN',
-            __file__,
-        ]
-
-        for _ in range(10):
-            for name in filenames:
-                try:
-                    nt._getfinalpathname(name)
-                except Exception:
-                    # Failure is expected
-                    pass
-                try:
-                    os.stat(name)
-                except Exception:
-                    pass
-
-        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
-        self.assertEqual(1, ok)
-
-        handle_delta = handle_count.value - before_count
-
-        self.assertEqual(0, handle_delta)
-
-    @support.requires_subprocess()
-    def test_stat_unlink_race(self):
-        # bpo-46785: the implementation of os.stat() falls back to reading
-        # the parent directory if CreateFileW() fails with a permission
-        # error. If reading the parent directory fails because the file or
-        # directory are subsequently unlinked, or because the volume or
-        # share are no longer available, then the original permission error
-        # should not be restored.
-        filename =  os_helper.TESTFN
-        self.addCleanup(os_helper.unlink, filename)
-        deadline = time.time() + 5
-        command = textwrap.dedent("""\
-            import os
-            import sys
-            import time
-
-            filename = sys.argv[1]
-            deadline = float(sys.argv[2])
-
-            while time.time() < deadline:
-                try:
-                    with open(filename, "w") as f:
-                        pass
-                except OSError:
-                    pass
-                try:
-                    os.remove(filename)
-                except OSError:
-                    pass
-            """)
-
-        with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc:
-            while time.time() < deadline:
-                try:
-                    os.stat(filename)
-                except FileNotFoundError as e:
-                    assert e.winerror == 2  # ERROR_FILE_NOT_FOUND
-            try:
-                proc.wait(1)
-            except subprocess.TimeoutExpired:
-                proc.terminate()
-
-    @support.requires_subprocess()
-    def test_stat_inaccessible_file(self):
-        filename = os_helper.TESTFN
-        ICACLS = os.path.expandvars(r"%SystemRoot%\System32\icacls.exe")
-
-        with open(filename, "wb") as f:
-            f.write(b'Test data')
-
-        stat1 = os.stat(filename)
-
-        try:
-            # Remove all permissions from the file
-            subprocess.check_output([ICACLS, filename, "/inheritance:r"],
-                                    stderr=subprocess.STDOUT)
-        except subprocess.CalledProcessError as ex:
-            if support.verbose:
-                print(ICACLS, filename, "/inheritance:r", "failed.")
-                print(ex.stdout.decode("oem", "replace").rstrip())
-            try:
-                os.unlink(filename)
-            except OSError:
-                pass
-            self.skipTest("Unable to create inaccessible file")
-
-        def cleanup():
-            # Give delete permission to the owner (us)
-            subprocess.check_output([ICACLS, filename, "/grant", "*WD:(D)"],
-                                    stderr=subprocess.STDOUT)
-            os.unlink(filename)
-
-        self.addCleanup(cleanup)
-
-        if support.verbose:
-            print("File:", filename)
-            print("stat with access:", stat1)
-
-        # First test - we shouldn't raise here, because we still have access to
-        # the directory and can extract enough information from its metadata.
-        stat2 = os.stat(filename)
-
-        if support.verbose:
-            print(" without access:", stat2)
-
-        # We may not get st_dev/st_ino, so ensure those are 0 or match
-        self.assertIn(stat2.st_dev, (0, stat1.st_dev))
-        self.assertIn(stat2.st_ino, (0, stat1.st_ino))
-
-        # st_mode and st_size should match (for a normal file, at least)
-        self.assertEqual(stat1.st_mode, stat2.st_mode)
-        self.assertEqual(stat1.st_size, stat2.st_size)
-
-        # st_ctime and st_mtime should be the same
-        self.assertEqual(stat1.st_ctime, stat2.st_ctime)
-        self.assertEqual(stat1.st_mtime, stat2.st_mtime)
-
-        # st_atime should be the same or later
-        self.assertGreaterEqual(stat1.st_atime, stat2.st_atime)
-
-
 @os_helper.skip_unless_symlink
 class NonLocalSymlinkTests(unittest.TestCase):
 
diff --git a/Lib/test/test_os/test_windows.py b/Lib/test/test_os/test_windows.py
new file mode 100644 (file)
index 0000000..b306a03
--- /dev/null
@@ -0,0 +1,640 @@
+import sys
+import unittest
+
+if sys.platform != "win32":
+    raise unittest.SkipTest("Win32 specific tests")
+
+import _winapi
+import fnmatch
+import mmap
+import os
+import shutil
+import signal
+import stat
+import subprocess
+import textwrap
+import time
+import uuid
+from test import support
+from test.support import import_helper
+from test.support import os_helper
+from .utils import create_file
+
+
+class Win32ErrorTests(unittest.TestCase):
+    def setUp(self):
+        try:
+            os.stat(os_helper.TESTFN)
+        except FileNotFoundError:
+            exists = False
+        except OSError as exc:
+            exists = True
+            self.fail("file %s must not exist; os.stat failed with %s"
+                      % (os_helper.TESTFN, exc))
+        else:
+            self.fail("file %s must not exist" % os_helper.TESTFN)
+
+    def test_rename(self):
+        self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
+
+    def test_remove(self):
+        self.assertRaises(OSError, os.remove, os_helper.TESTFN)
+
+    def test_chdir(self):
+        self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
+
+    def test_mkdir(self):
+        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
+
+        with open(os_helper.TESTFN, "x") as f:
+            self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
+
+    def test_utime(self):
+        self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
+
+    def test_chmod(self):
+        self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
+
+
+class Win32KillTests(unittest.TestCase):
+    def _kill(self, sig):
+        # Start sys.executable as a subprocess and communicate from the
+        # subprocess to the parent that the interpreter is ready. When it
+        # becomes ready, send *sig* via os.kill to the subprocess and check
+        # that the return code is equal to *sig*.
+        import ctypes
+        from ctypes import wintypes
+        import msvcrt
+
+        # Since we can't access the contents of the process' stdout until the
+        # process has exited, use PeekNamedPipe to see what's inside stdout
+        # without waiting. This is done so we can tell that the interpreter
+        # is started and running at a point where it could handle a signal.
+        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
+        PeekNamedPipe.restype = wintypes.BOOL
+        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
+                                  ctypes.POINTER(ctypes.c_char), # stdout buf
+                                  wintypes.DWORD, # Buffer size
+                                  ctypes.POINTER(wintypes.DWORD), # bytes read
+                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
+                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
+        msg = "running"
+        proc = subprocess.Popen([sys.executable, "-c",
+                                 "import sys;"
+                                 "sys.stdout.write('{}');"
+                                 "sys.stdout.flush();"
+                                 "input()".format(msg)],
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE,
+                                stdin=subprocess.PIPE)
+        self.addCleanup(proc.stdout.close)
+        self.addCleanup(proc.stderr.close)
+        self.addCleanup(proc.stdin.close)
+
+        count, max = 0, 100
+        while count < max and proc.poll() is None:
+            # Create a string buffer to store the result of stdout from the pipe
+            buf = ctypes.create_string_buffer(len(msg))
+            # Obtain the text currently in proc.stdout
+            # Bytes read/avail/left are left as NULL and unused
+            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
+                                 buf, ctypes.sizeof(buf), None, None, None)
+            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
+            if buf.value:
+                self.assertEqual(msg, buf.value.decode())
+                break
+            time.sleep(0.1)
+            count += 1
+        else:
+            self.fail("Did not receive communication from the subprocess")
+
+        os.kill(proc.pid, sig)
+        self.assertEqual(proc.wait(), sig)
+
+    def test_kill_sigterm(self):
+        # SIGTERM doesn't mean anything special, but make sure it works
+        self._kill(signal.SIGTERM)
+
+    def test_kill_int(self):
+        # os.kill on Windows can take an int which gets set as the exit code
+        self._kill(100)
+
+    @unittest.skipIf(mmap is None, "requires mmap")
+    def _kill_with_event(self, event, name):
+        tagname = "test_os_%s" % uuid.uuid1()
+        m = mmap.mmap(-1, 1, tagname)
+        m[0] = 0
+
+        # Run a script which has console control handling enabled.
+        script = os.path.join(os.path.dirname(__file__),
+                              "win_console_handler.py")
+        cmd = [sys.executable, script, tagname]
+        proc = subprocess.Popen(cmd,
+                                creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
+
+        with proc:
+            # Let the interpreter startup before we send signals. See #3137.
+            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
+                if proc.poll() is None:
+                    break
+            else:
+                # Forcefully kill the process if we weren't able to signal it.
+                proc.kill()
+                self.fail("Subprocess didn't finish initialization")
+
+            os.kill(proc.pid, event)
+
+            try:
+                # proc.send_signal(event) could also be done here.
+                # Allow time for the signal to be passed and the process to exit.
+                proc.wait(timeout=support.SHORT_TIMEOUT)
+            except subprocess.TimeoutExpired:
+                # Forcefully kill the process if we weren't able to signal it.
+                proc.kill()
+                self.fail("subprocess did not stop on {}".format(name))
+
+    @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
+    @support.requires_subprocess()
+    def test_CTRL_C_EVENT(self):
+        from ctypes import wintypes
+        import ctypes
+
+        # Make a NULL value by creating a pointer with no argument.
+        NULL = ctypes.POINTER(ctypes.c_int)()
+        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
+        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
+                                          wintypes.BOOL)
+        SetConsoleCtrlHandler.restype = wintypes.BOOL
+
+        # Calling this with NULL and FALSE causes the calling process to
+        # handle Ctrl+C, rather than ignore it. This property is inherited
+        # by subprocesses.
+        SetConsoleCtrlHandler(NULL, 0)
+
+        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
+
+    @support.requires_subprocess()
+    def test_CTRL_BREAK_EVENT(self):
+        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
+
+
+class Win32ListdirTests(unittest.TestCase):
+    """Test listdir on Windows."""
+
+    def setUp(self):
+        self.created_paths = []
+        for i in range(2):
+            dir_name = 'SUB%d' % i
+            dir_path = os.path.join(os_helper.TESTFN, dir_name)
+            file_name = 'FILE%d' % i
+            file_path = os.path.join(os_helper.TESTFN, file_name)
+            os.makedirs(dir_path)
+            with open(file_path, 'w', encoding='utf-8') as f:
+                f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
+            self.created_paths.extend([dir_name, file_name])
+        self.created_paths.sort()
+
+    def tearDown(self):
+        shutil.rmtree(os_helper.TESTFN)
+
+    def test_listdir_no_extended_path(self):
+        """Test when the path is not an "extended" path."""
+        # unicode
+        self.assertEqual(
+                sorted(os.listdir(os_helper.TESTFN)),
+                self.created_paths)
+
+        # bytes
+        self.assertEqual(
+                sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
+                [os.fsencode(path) for path in self.created_paths])
+
+    def test_listdir_extended_path(self):
+        """Test when the path starts with '\\\\?\\'."""
+        # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
+        # unicode
+        path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
+        self.assertEqual(
+                sorted(os.listdir(path)),
+                self.created_paths)
+
+        # bytes
+        path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
+        self.assertEqual(
+                sorted(os.listdir(path)),
+                [os.fsencode(path) for path in self.created_paths])
+
+
+@unittest.skipUnless(os.name == "nt", "NT specific tests")
+class Win32ListdriveTests(unittest.TestCase):
+    """Test listdrive, listmounts and listvolume on Windows."""
+
+    def setUp(self):
+        # Get drives and volumes from fsutil
+        out = subprocess.check_output(
+            ["fsutil.exe", "volume", "list"],
+            cwd=os.path.join(os.getenv("SystemRoot", "\\Windows"), "System32"),
+            encoding="mbcs",
+            errors="ignore",
+        )
+        lines = out.splitlines()
+        self.known_volumes = {l for l in lines if l.startswith('\\\\?\\')}
+        self.known_drives = {l for l in lines if l[1:] == ':\\'}
+        self.known_mounts = {l for l in lines if l[1:3] == ':\\'}
+
+    def test_listdrives(self):
+        drives = os.listdrives()
+        self.assertIsInstance(drives, list)
+        self.assertSetEqual(
+            self.known_drives,
+            self.known_drives & set(drives),
+        )
+
+    def test_listvolumes(self):
+        volumes = os.listvolumes()
+        self.assertIsInstance(volumes, list)
+        self.assertSetEqual(
+            self.known_volumes,
+            self.known_volumes & set(volumes),
+        )
+
+    def test_listmounts(self):
+        for volume in os.listvolumes():
+            try:
+                mounts = os.listmounts(volume)
+            except OSError as ex:
+                if support.verbose:
+                    print("Skipping", volume, "because of", ex)
+            else:
+                self.assertIsInstance(mounts, list)
+                self.assertSetEqual(
+                    set(mounts),
+                    self.known_mounts & set(mounts),
+                )
+
+
+@os_helper.skip_unless_symlink
+class Win32SymlinkTests(unittest.TestCase):
+    filelink = 'filelinktest'
+    filelink_target = os.path.abspath(__file__)
+    dirlink = 'dirlinktest'
+    dirlink_target = os.path.dirname(filelink_target)
+    missing_link = 'missing link'
+
+    def setUp(self):
+        assert os.path.exists(self.dirlink_target)
+        assert os.path.exists(self.filelink_target)
+        assert not os.path.exists(self.dirlink)
+        assert not os.path.exists(self.filelink)
+        assert not os.path.exists(self.missing_link)
+
+    def tearDown(self):
+        if os.path.exists(self.filelink):
+            os.remove(self.filelink)
+        if os.path.exists(self.dirlink):
+            os.rmdir(self.dirlink)
+        if os.path.lexists(self.missing_link):
+            os.remove(self.missing_link)
+
+    def test_directory_link(self):
+        os.symlink(self.dirlink_target, self.dirlink)
+        self.assertTrue(os.path.exists(self.dirlink))
+        self.assertTrue(os.path.isdir(self.dirlink))
+        self.assertTrue(os.path.islink(self.dirlink))
+        self.check_stat(self.dirlink, self.dirlink_target)
+
+    def test_file_link(self):
+        os.symlink(self.filelink_target, self.filelink)
+        self.assertTrue(os.path.exists(self.filelink))
+        self.assertTrue(os.path.isfile(self.filelink))
+        self.assertTrue(os.path.islink(self.filelink))
+        self.check_stat(self.filelink, self.filelink_target)
+
+    def _create_missing_dir_link(self):
+        'Create a "directory" link to a non-existent target'
+        linkname = self.missing_link
+        if os.path.lexists(linkname):
+            os.remove(linkname)
+        target = r'c:\\target does not exist.29r3c740'
+        assert not os.path.exists(target)
+        target_is_dir = True
+        os.symlink(target, linkname, target_is_dir)
+
+    def test_remove_directory_link_to_missing_target(self):
+        self._create_missing_dir_link()
+        # For compatibility with Unix, os.remove will check the
+        #  directory status and call RemoveDirectory if the symlink
+        #  was created with target_is_dir==True.
+        os.remove(self.missing_link)
+
+    def test_isdir_on_directory_link_to_missing_target(self):
+        self._create_missing_dir_link()
+        self.assertFalse(os.path.isdir(self.missing_link))
+
+    def test_rmdir_on_directory_link_to_missing_target(self):
+        self._create_missing_dir_link()
+        os.rmdir(self.missing_link)
+
+    def check_stat(self, link, target):
+        self.assertEqual(os.stat(link), os.stat(target))
+        self.assertNotEqual(os.lstat(link), os.stat(link))
+
+        bytes_link = os.fsencode(link)
+        self.assertEqual(os.stat(bytes_link), os.stat(target))
+        self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
+
+    def test_12084(self):
+        level1 = os.path.abspath(os_helper.TESTFN)
+        level2 = os.path.join(level1, "level2")
+        level3 = os.path.join(level2, "level3")
+        self.addCleanup(os_helper.rmtree, level1)
+
+        os.mkdir(level1)
+        os.mkdir(level2)
+        os.mkdir(level3)
+
+        file1 = os.path.abspath(os.path.join(level1, "file1"))
+        create_file(file1)
+
+        orig_dir = os.getcwd()
+        try:
+            os.chdir(level2)
+            link = os.path.join(level2, "link")
+            os.symlink(os.path.relpath(file1), "link")
+            self.assertIn("link", os.listdir(os.getcwd()))
+
+            # Check os.stat calls from the same dir as the link
+            self.assertEqual(os.stat(file1), os.stat("link"))
+
+            # Check os.stat calls from a dir below the link
+            os.chdir(level1)
+            self.assertEqual(os.stat(file1),
+                             os.stat(os.path.relpath(link)))
+
+            # Check os.stat calls from a dir above the link
+            os.chdir(level3)
+            self.assertEqual(os.stat(file1),
+                             os.stat(os.path.relpath(link)))
+        finally:
+            os.chdir(orig_dir)
+
+    @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
+                            and os.path.exists(r'C:\ProgramData'),
+                            'Test directories not found')
+    def test_29248(self):
+        # os.symlink() calls CreateSymbolicLink, which creates
+        # the reparse data buffer with the print name stored
+        # first, so the offset is always 0. CreateSymbolicLink
+        # stores the "PrintName" DOS path (e.g. "C:\") first,
+        # with an offset of 0, followed by the "SubstituteName"
+        # NT path (e.g. "\??\C:\"). The "All Users" link, on
+        # the other hand, seems to have been created manually
+        # with an inverted order.
+        target = os.readlink(r'C:\Users\All Users')
+        self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
+
+    def test_buffer_overflow(self):
+        # Older versions would have a buffer overflow when detecting
+        # whether a link source was a directory. This test ensures we
+        # no longer crash, but does not otherwise validate the behavior
+        segment = 'X' * 27
+        path = os.path.join(*[segment] * 10)
+        test_cases = [
+            # overflow with absolute src
+            ('\\' + path, segment),
+            # overflow dest with relative src
+            (segment, path),
+            # overflow when joining src
+            (path[:180], path[:180]),
+        ]
+        for src, dest in test_cases:
+            try:
+                os.symlink(src, dest)
+            except FileNotFoundError:
+                pass
+            else:
+                try:
+                    os.remove(dest)
+                except OSError:
+                    pass
+            # Also test with bytes, since that is a separate code path.
+            try:
+                os.symlink(os.fsencode(src), os.fsencode(dest))
+            except FileNotFoundError:
+                pass
+            else:
+                try:
+                    os.remove(dest)
+                except OSError:
+                    pass
+
+    def test_appexeclink(self):
+        root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
+        if not os.path.isdir(root):
+            self.skipTest("test requires a WindowsApps directory")
+
+        aliases = [os.path.join(root, a)
+                   for a in fnmatch.filter(os.listdir(root), '*.exe')]
+
+        for alias in aliases:
+            if support.verbose:
+                print()
+                print("Testing with", alias)
+            st = os.lstat(alias)
+            self.assertEqual(st, os.stat(alias))
+            self.assertFalse(stat.S_ISLNK(st.st_mode))
+            self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
+            self.assertTrue(os.path.isfile(alias))
+            # testing the first one we see is sufficient
+            break
+        else:
+            self.skipTest("test requires an app execution alias")
+
+
+class Win32JunctionTests(unittest.TestCase):
+    junction = 'junctiontest'
+    junction_target = os.path.dirname(os.path.abspath(__file__))
+
+    def setUp(self):
+        assert os.path.exists(self.junction_target)
+        assert not os.path.lexists(self.junction)
+
+    def tearDown(self):
+        if os.path.lexists(self.junction):
+            os.unlink(self.junction)
+
+    def test_create_junction(self):
+        _winapi.CreateJunction(self.junction_target, self.junction)
+        self.assertTrue(os.path.lexists(self.junction))
+        self.assertTrue(os.path.exists(self.junction))
+        self.assertTrue(os.path.isdir(self.junction))
+        self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
+        self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
+
+        # bpo-37834: Junctions are not recognized as links.
+        self.assertFalse(os.path.islink(self.junction))
+        self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
+                         os.path.normcase(os.readlink(self.junction)))
+
+    def test_unlink_removes_junction(self):
+        _winapi.CreateJunction(self.junction_target, self.junction)
+        self.assertTrue(os.path.exists(self.junction))
+        self.assertTrue(os.path.lexists(self.junction))
+
+        os.unlink(self.junction)
+        self.assertFalse(os.path.exists(self.junction))
+
+
+class Win32NtTests(unittest.TestCase):
+    def test_getfinalpathname_handles(self):
+        nt = import_helper.import_module('nt')
+        ctypes = import_helper.import_module('ctypes')
+        # Ruff false positive -- it thinks we're redefining `ctypes` here
+        import ctypes.wintypes  # noqa: F811
+
+        kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
+        kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
+
+        kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
+        kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
+                                                 ctypes.wintypes.LPDWORD)
+
+        # This is a pseudo-handle that doesn't need to be closed
+        hproc = kernel.GetCurrentProcess()
+
+        handle_count = ctypes.wintypes.DWORD()
+        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
+        self.assertEqual(1, ok)
+
+        before_count = handle_count.value
+
+        # The first two test the error path, __file__ tests the success path
+        filenames = [
+            r'\\?\C:',
+            r'\\?\NUL',
+            r'\\?\CONIN',
+            __file__,
+        ]
+
+        for _ in range(10):
+            for name in filenames:
+                try:
+                    nt._getfinalpathname(name)
+                except Exception:
+                    # Failure is expected
+                    pass
+                try:
+                    os.stat(name)
+                except Exception:
+                    pass
+
+        ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
+        self.assertEqual(1, ok)
+
+        handle_delta = handle_count.value - before_count
+
+        self.assertEqual(0, handle_delta)
+
+    @support.requires_subprocess()
+    def test_stat_unlink_race(self):
+        # bpo-46785: the implementation of os.stat() falls back to reading
+        # the parent directory if CreateFileW() fails with a permission
+        # error. If reading the parent directory fails because the file or
+        # directory are subsequently unlinked, or because the volume or
+        # share are no longer available, then the original permission error
+        # should not be restored.
+        filename =  os_helper.TESTFN
+        self.addCleanup(os_helper.unlink, filename)
+        deadline = time.time() + 5
+        command = textwrap.dedent("""\
+            import os
+            import sys
+            import time
+
+            filename = sys.argv[1]
+            deadline = float(sys.argv[2])
+
+            while time.time() < deadline:
+                try:
+                    with open(filename, "w") as f:
+                        pass
+                except OSError:
+                    pass
+                try:
+                    os.remove(filename)
+                except OSError:
+                    pass
+            """)
+
+        with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc:
+            while time.time() < deadline:
+                try:
+                    os.stat(filename)
+                except FileNotFoundError as e:
+                    assert e.winerror == 2  # ERROR_FILE_NOT_FOUND
+            try:
+                proc.wait(1)
+            except subprocess.TimeoutExpired:
+                proc.terminate()
+
+    @support.requires_subprocess()
+    def test_stat_inaccessible_file(self):
+        filename = os_helper.TESTFN
+        ICACLS = os.path.expandvars(r"%SystemRoot%\System32\icacls.exe")
+
+        with open(filename, "wb") as f:
+            f.write(b'Test data')
+
+        stat1 = os.stat(filename)
+
+        try:
+            # Remove all permissions from the file
+            subprocess.check_output([ICACLS, filename, "/inheritance:r"],
+                                    stderr=subprocess.STDOUT)
+        except subprocess.CalledProcessError as ex:
+            if support.verbose:
+                print(ICACLS, filename, "/inheritance:r", "failed.")
+                print(ex.stdout.decode("oem", "replace").rstrip())
+            try:
+                os.unlink(filename)
+            except OSError:
+                pass
+            self.skipTest("Unable to create inaccessible file")
+
+        def cleanup():
+            # Give delete permission to the owner (us)
+            subprocess.check_output([ICACLS, filename, "/grant", "*WD:(D)"],
+                                    stderr=subprocess.STDOUT)
+            os.unlink(filename)
+
+        self.addCleanup(cleanup)
+
+        if support.verbose:
+            print("File:", filename)
+            print("stat with access:", stat1)
+
+        # First test - we shouldn't raise here, because we still have access to
+        # the directory and can extract enough information from its metadata.
+        stat2 = os.stat(filename)
+
+        if support.verbose:
+            print(" without access:", stat2)
+
+        # We may not get st_dev/st_ino, so ensure those are 0 or match
+        self.assertIn(stat2.st_dev, (0, stat1.st_dev))
+        self.assertIn(stat2.st_ino, (0, stat1.st_ino))
+
+        # st_mode and st_size should match (for a normal file, at least)
+        self.assertEqual(stat1.st_mode, stat2.st_mode)
+        self.assertEqual(stat1.st_size, stat2.st_size)
+
+        # st_ctime and st_mtime should be the same
+        self.assertEqual(stat1.st_ctime, stat2.st_ctime)
+        self.assertEqual(stat1.st_mtime, stat2.st_mtime)
+
+        # st_atime should be the same or later
+        self.assertGreaterEqual(stat1.st_atime, stat2.st_atime)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Lib/test/test_os/utils.py b/Lib/test/test_os/utils.py
new file mode 100644 (file)
index 0000000..e0c3959
--- /dev/null
@@ -0,0 +1,3 @@
+def create_file(filename, content=b'content'):
+    with open(filename, "xb", 0) as fp:
+        fp.write(content)
index eedccc3ffe6a49f0412aa8df03afed5df0f0c769..6651b093e20c8dc8781607189bc1acc1d0e449ad 100644 (file)
@@ -2682,6 +2682,7 @@ TESTSUBDIRS=      idlelib/idle_test \
                test/test_multiprocessing_fork \
                test/test_multiprocessing_forkserver \
                test/test_multiprocessing_spawn \
+               test/test_os \
                test/test_pathlib \
                test/test_pathlib/support \
                test/test_peg_generator \