it's developed alongside pathlib. If it finds success and maturity as a PyPI
package, it could become a public part of the standard library.
-Two base classes are defined here -- PurePathBase and PathBase -- that
-resemble pathlib's PurePath and Path respectively.
+Three base classes are defined here -- JoinablePath, ReadablePath and
+WritablePath.
"""
import functools
return path.with_segments(str(path) + text)
-class CopyWorker:
+class CopyReader:
"""
Class that implements copying between path objects. An instance of this
- class is available from the PathBase.copy property; it's made callable so
- that PathBase.copy() can be treated as a method.
+ class is available from the ReadablePath.copy property; it's made callable
+ so that ReadablePath.copy() can be treated as a method.
- The target path's CopyWorker drives the process from its _create() method.
+ The target path's CopyWriter drives the process from its _create() method.
Files and directories are exchanged by calling methods on the source and
target paths, and metadata is exchanged by calling
source.copy._read_metadata() and target.copy._write_metadata().
"""
Recursively copy this file or directory tree to the given destination.
"""
- if not isinstance(target, PathBase):
+ if not isinstance(target, ReadablePath):
target = self._path.with_segments(target)
- # Delegate to the target path's CopyWorker object.
- return target.copy._create(self._path, follow_symlinks, dirs_exist_ok, preserve_metadata)
+ # Delegate to the target path's CopyWriter object.
+ try:
+ create = target.copy._create
+ except AttributeError:
+ raise TypeError(f"Target is not writable: {target}") from None
+ return create(self._path, follow_symlinks, dirs_exist_ok, preserve_metadata)
_readable_metakeys = frozenset()
"""
raise NotImplementedError
+
+class CopyWriter(CopyReader):
+ __slots__ = ()
+
_writable_metakeys = frozenset()
def _write_metadata(self, metadata, *, follow_symlinks=True):
raise err
-class PurePathBase:
+class JoinablePath:
"""Base class for pure path objects.
This class *does not* provide several magic methods that are defined in
is matched. The recursive wildcard '**' is *not* supported by this
method.
"""
- if not isinstance(path_pattern, PurePathBase):
+ if not isinstance(path_pattern, JoinablePath):
path_pattern = self.with_segments(path_pattern)
if case_sensitive is None:
case_sensitive = _is_case_sensitive(self.parser)
Return True if this path matches the given glob-style pattern. The
pattern is matched against the entire path.
"""
- if not isinstance(pattern, PurePathBase):
+ if not isinstance(pattern, JoinablePath):
pattern = self.with_segments(pattern)
if case_sensitive is None:
case_sensitive = _is_case_sensitive(self.parser)
-class PathBase(PurePathBase):
+class ReadablePath(JoinablePath):
"""Base class for concrete path objects.
This class provides dummy implementations for many methods that derived
with self.open(mode='r', encoding=encoding, errors=errors, newline=newline) as f:
return f.read()
- def write_bytes(self, data):
- """
- Open the file in bytes mode, write to it, and close the file.
- """
- # type-check for the buffer interface before truncating the file
- view = memoryview(data)
- with self.open(mode='wb') as f:
- return f.write(view)
-
- def write_text(self, data, encoding=None, errors=None, newline=None):
- """
- Open the file in text mode, write to it, and close the file.
- """
- if not isinstance(data, str):
- raise TypeError('data must be str, not %s' %
- data.__class__.__name__)
- with self.open(mode='w', encoding=encoding, errors=errors, newline=newline) as f:
- return f.write(data)
-
def _scandir(self):
"""Yield os.DirEntry-like objects of the directory contents.
"""Iterate over this subtree and yield all existing files (of any
kind, including directories) matching the given relative pattern.
"""
- if not isinstance(pattern, PurePathBase):
+ if not isinstance(pattern, JoinablePath):
pattern = self.with_segments(pattern)
anchor, parts = _explode_path(pattern)
if anchor:
directories) matching the given relative pattern, anywhere in
this subtree.
"""
- if not isinstance(pattern, PurePathBase):
+ if not isinstance(pattern, JoinablePath):
pattern = self.with_segments(pattern)
pattern = '**' / pattern
return self.glob(pattern, case_sensitive=case_sensitive, recurse_symlinks=recurse_symlinks)
"""
raise NotImplementedError
+ copy = property(CopyReader, doc=CopyReader.__call__.__doc__)
+
+ def copy_into(self, target_dir, *, follow_symlinks=True,
+ dirs_exist_ok=False, preserve_metadata=False):
+ """
+ Copy this file or directory tree into the given existing directory.
+ """
+ name = self.name
+ if not name:
+ raise ValueError(f"{self!r} has an empty name")
+ elif isinstance(target_dir, ReadablePath):
+ target = target_dir / name
+ else:
+ target = self.with_segments(target_dir, name)
+ return self.copy(target, follow_symlinks=follow_symlinks,
+ dirs_exist_ok=dirs_exist_ok,
+ preserve_metadata=preserve_metadata)
+
+
+class WritablePath(ReadablePath):
+ __slots__ = ()
+
def symlink_to(self, target, target_is_directory=False):
"""
Make this path a symlink pointing to the target path.
"""
raise NotImplementedError
- copy = property(CopyWorker, doc=CopyWorker.__call__.__doc__)
+ def write_bytes(self, data):
+ """
+ Open the file in bytes mode, write to it, and close the file.
+ """
+ # type-check for the buffer interface before truncating the file
+ view = memoryview(data)
+ with self.open(mode='wb') as f:
+ return f.write(view)
- def copy_into(self, target_dir, *, follow_symlinks=True,
- dirs_exist_ok=False, preserve_metadata=False):
+ def write_text(self, data, encoding=None, errors=None, newline=None):
"""
- Copy this file or directory tree into the given existing directory.
+ Open the file in text mode, write to it, and close the file.
"""
- name = self.name
- if not name:
- raise ValueError(f"{self!r} has an empty name")
- elif isinstance(target_dir, PathBase):
- target = target_dir / name
- else:
- target = self.with_segments(target_dir, name)
- return self.copy(target, follow_symlinks=follow_symlinks,
- dirs_exist_ok=dirs_exist_ok,
- preserve_metadata=preserve_metadata)
+ if not isinstance(data, str):
+ raise TypeError('data must be str, not %s' %
+ data.__class__.__name__)
+ with self.open(mode='w', encoding=encoding, errors=errors, newline=newline) as f:
+ return f.write(data)
+
+ copy = property(CopyWriter, doc=CopyWriter.__call__.__doc__)
import errno
import unittest
-from pathlib._abc import PurePathBase, PathBase
+from pathlib._abc import JoinablePath, ReadablePath, WritablePath
from pathlib._types import Parser
import posixpath
#
-class PurePathBaseTest(unittest.TestCase):
- cls = PurePathBase
+class JoinablePathTest(unittest.TestCase):
+ cls = JoinablePath
def test_magic_methods(self):
P = self.cls
self.assertIs(self.cls.parser, posixpath)
-class DummyPurePath(PurePathBase):
+class DummyJoinablePath(JoinablePath):
__slots__ = ('_segments',)
def __init__(self, *segments):
return ''
def __eq__(self, other):
- if not isinstance(other, DummyPurePath):
+ if not isinstance(other, DummyJoinablePath):
return NotImplemented
return str(self) == str(other)
return type(self)(*pathsegments)
-class DummyPurePathTest(unittest.TestCase):
- cls = DummyPurePath
+class DummyJoinablePathTest(unittest.TestCase):
+ cls = DummyJoinablePath
# Use a base path that's unrelated to any real filesystem path.
base = f'/this/path/kills/fascists/{TESTFN}'
#
-class DummyPathIO(io.BytesIO):
+class DummyWritablePathIO(io.BytesIO):
"""
- Used by DummyPath to implement `open('w')`
+ Used by DummyWritablePath to implement `open('w')`
"""
def __init__(self, files, path):
super().close()
-class DummyPath(PathBase):
+class DummyReadablePath(ReadablePath):
"""
- Simple implementation of PathBase that keeps files and directories in
- memory.
+ Simple implementation of DummyReadablePath that keeps files and
+ directories in memory.
"""
__slots__ = ('_segments')
return ''
def __eq__(self, other):
- if not isinstance(other, DummyPath):
+ if not isinstance(other, DummyReadablePath):
return NotImplemented
return str(self) == str(other)
raise FileNotFoundError(errno.ENOENT, "File not found", path)
stream = io.BytesIO(self._files[path])
elif mode == 'w':
+ # FIXME: move to DummyWritablePath
parent, name = posixpath.split(path)
if parent not in self._directories:
raise FileNotFoundError(errno.ENOENT, "File not found", parent)
- stream = DummyPathIO(self._files, path)
+ stream = DummyWritablePathIO(self._files, path)
self._files[path] = b''
self._directories[parent].add(name)
else:
else:
raise FileNotFoundError(errno.ENOENT, "File not found", path)
+
+class DummyWritablePath(DummyReadablePath, WritablePath):
+ __slots__ = ()
+
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
path = str(self)
parent = str(self.parent)
self.parent.mkdir(parents=True, exist_ok=True)
self.mkdir(mode, parents=False, exist_ok=exist_ok)
- def _delete(self):
- path = str(self)
- if path in self._files:
- del self._files[path]
- elif path in self._directories:
- for name in list(self._directories[path]):
- self.joinpath(name)._delete()
- del self._directories[path]
- else:
- raise FileNotFoundError(errno.ENOENT, "File not found", path)
- parent = str(self.parent)
- self._directories[parent].remove(self.name)
-
-class DummyPathTest(DummyPurePathTest):
- """Tests for PathBase methods that use stat(), open() and iterdir()."""
+class DummyReadablePathTest(DummyJoinablePathTest):
+ """Tests for ReadablePathTest methods that use stat(), open() and iterdir()."""
- cls = DummyPath
+ cls = DummyReadablePath
can_symlink = False
# (self.base)
self.assertIsInstance(f, io.BufferedIOBase)
self.assertEqual(f.read().strip(), b"this is file A")
- def test_read_write_bytes(self):
- p = self.cls(self.base)
- (p / 'fileA').write_bytes(b'abcdefg')
- self.assertEqual((p / 'fileA').read_bytes(), b'abcdefg')
- # Check that trying to write str does not truncate the file.
- self.assertRaises(TypeError, (p / 'fileA').write_bytes, 'somestr')
- self.assertEqual((p / 'fileA').read_bytes(), b'abcdefg')
-
- def test_read_write_text(self):
- p = self.cls(self.base)
- (p / 'fileA').write_text('äbcdefg', encoding='latin-1')
- self.assertEqual((p / 'fileA').read_text(
- encoding='utf-8', errors='ignore'), 'bcdefg')
- # Check that trying to write bytes does not truncate the file.
- self.assertRaises(TypeError, (p / 'fileA').write_text, b'somebytes')
- self.assertEqual((p / 'fileA').read_text(encoding='latin-1'), 'äbcdefg')
-
- def test_read_text_with_newlines(self):
- p = self.cls(self.base)
- # Check that `\n` character change nothing
- (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
- self.assertEqual((p / 'fileA').read_text(newline='\n'),
- 'abcde\r\nfghlk\n\rmnopq')
- # Check that `\r` character replaces `\n`
- (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
- self.assertEqual((p / 'fileA').read_text(newline='\r'),
- 'abcde\r\nfghlk\n\rmnopq')
- # Check that `\r\n` character replaces `\n`
- (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
- self.assertEqual((p / 'fileA').read_text(newline='\r\n'),
- 'abcde\r\nfghlk\n\rmnopq')
-
- def test_write_text_with_newlines(self):
- p = self.cls(self.base)
- # Check that `\n` character change nothing
- (p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\n')
- self.assertEqual((p / 'fileA').read_bytes(),
- b'abcde\r\nfghlk\n\rmnopq')
- # Check that `\r` character replaces `\n`
- (p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\r')
- self.assertEqual((p / 'fileA').read_bytes(),
- b'abcde\r\rfghlk\r\rmnopq')
- # Check that `\r\n` character replaces `\n`
- (p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\r\n')
- self.assertEqual((p / 'fileA').read_bytes(),
- b'abcde\r\r\nfghlk\r\n\rmnopq')
- # Check that no argument passed will change `\n` to `os.linesep`
- os_linesep_byte = bytes(os.linesep, encoding='ascii')
- (p / 'fileA').write_text('abcde\nfghlk\n\rmnopq')
- self.assertEqual((p / 'fileA').read_bytes(),
- b'abcde' + os_linesep_byte + b'fghlk' + os_linesep_byte + b'\rmnopq')
-
- def test_copy_file(self):
- base = self.cls(self.base)
- source = base / 'fileA'
- target = base / 'copyA'
- result = source.copy(target)
- self.assertEqual(result, target)
- self.assertTrue(target.exists())
- self.assertEqual(source.read_text(), target.read_text())
-
- def test_copy_file_to_existing_file(self):
- base = self.cls(self.base)
- source = base / 'fileA'
- target = base / 'dirB' / 'fileB'
- result = source.copy(target)
- self.assertEqual(result, target)
- self.assertTrue(target.exists())
- self.assertEqual(source.read_text(), target.read_text())
-
- def test_copy_file_to_existing_directory(self):
- base = self.cls(self.base)
- source = base / 'fileA'
- target = base / 'dirA'
- self.assertRaises(OSError, source.copy, target)
-
- def test_copy_file_empty(self):
- base = self.cls(self.base)
- source = base / 'empty'
- target = base / 'copyA'
- source.write_bytes(b'')
- result = source.copy(target)
- self.assertEqual(result, target)
- self.assertTrue(target.exists())
- self.assertEqual(target.read_bytes(), b'')
-
- def test_copy_file_to_itself(self):
- base = self.cls(self.base)
- source = base / 'empty'
- source.write_bytes(b'')
- self.assertRaises(OSError, source.copy, source)
- self.assertRaises(OSError, source.copy, source, follow_symlinks=False)
-
- def test_copy_dir_simple(self):
- base = self.cls(self.base)
- source = base / 'dirC'
- target = base / 'copyC'
- result = source.copy(target)
- self.assertEqual(result, target)
- self.assertTrue(target.is_dir())
- self.assertTrue(target.joinpath('dirD').is_dir())
- self.assertTrue(target.joinpath('dirD', 'fileD').is_file())
- self.assertEqual(target.joinpath('dirD', 'fileD').read_text(),
- "this is file D\n")
- self.assertTrue(target.joinpath('fileC').is_file())
- self.assertTrue(target.joinpath('fileC').read_text(),
- "this is file C\n")
-
- def test_copy_dir_complex(self, follow_symlinks=True):
- def ordered_walk(path):
- for dirpath, dirnames, filenames in path.walk(follow_symlinks=follow_symlinks):
- dirnames.sort()
- filenames.sort()
- yield dirpath, dirnames, filenames
- base = self.cls(self.base)
- source = base / 'dirC'
-
- if self.can_symlink:
- # Add some symlinks
- source.joinpath('linkC').symlink_to('fileC')
- source.joinpath('linkD').symlink_to('dirD', target_is_directory=True)
-
- # Perform the copy
- target = base / 'copyC'
- result = source.copy(target, follow_symlinks=follow_symlinks)
- self.assertEqual(result, target)
-
- # Compare the source and target trees
- source_walk = ordered_walk(source)
- target_walk = ordered_walk(target)
- for source_item, target_item in zip(source_walk, target_walk, strict=True):
- self.assertEqual(source_item[0].parts[len(source.parts):],
- target_item[0].parts[len(target.parts):]) # dirpath
- self.assertEqual(source_item[1], target_item[1]) # dirnames
- self.assertEqual(source_item[2], target_item[2]) # filenames
- # Compare files and symlinks
- for filename in source_item[2]:
- source_file = source_item[0].joinpath(filename)
- target_file = target_item[0].joinpath(filename)
- if follow_symlinks or not source_file.is_symlink():
- # Regular file.
- self.assertEqual(source_file.read_bytes(), target_file.read_bytes())
- elif source_file.is_dir():
- # Symlink to directory.
- self.assertTrue(target_file.is_dir())
- self.assertEqual(source_file.readlink(), target_file.readlink())
- else:
- # Symlink to file.
- self.assertEqual(source_file.read_bytes(), target_file.read_bytes())
- self.assertEqual(source_file.readlink(), target_file.readlink())
-
- def test_copy_dir_complex_follow_symlinks_false(self):
- self.test_copy_dir_complex(follow_symlinks=False)
-
- def test_copy_dir_to_existing_directory(self):
- base = self.cls(self.base)
- source = base / 'dirC'
- target = base / 'copyC'
- target.mkdir()
- target.joinpath('dirD').mkdir()
- self.assertRaises(FileExistsError, source.copy, target)
-
- def test_copy_dir_to_existing_directory_dirs_exist_ok(self):
- base = self.cls(self.base)
- source = base / 'dirC'
- target = base / 'copyC'
- target.mkdir()
- target.joinpath('dirD').mkdir()
- result = source.copy(target, dirs_exist_ok=True)
- self.assertEqual(result, target)
- self.assertTrue(target.is_dir())
- self.assertTrue(target.joinpath('dirD').is_dir())
- self.assertTrue(target.joinpath('dirD', 'fileD').is_file())
- self.assertEqual(target.joinpath('dirD', 'fileD').read_text(),
- "this is file D\n")
- self.assertTrue(target.joinpath('fileC').is_file())
- self.assertTrue(target.joinpath('fileC').read_text(),
- "this is file C\n")
-
- def test_copy_dir_to_itself(self):
- base = self.cls(self.base)
- source = base / 'dirC'
- self.assertRaises(OSError, source.copy, source)
- self.assertRaises(OSError, source.copy, source, follow_symlinks=False)
-
- def test_copy_dir_into_itself(self):
- base = self.cls(self.base)
- source = base / 'dirC'
- target = base / 'dirC' / 'dirD' / 'copyC'
- self.assertRaises(OSError, source.copy, target)
- self.assertRaises(OSError, source.copy, target, follow_symlinks=False)
- self.assertFalse(target.exists())
-
- def test_copy_into(self):
- base = self.cls(self.base)
- source = base / 'fileA'
- target_dir = base / 'dirA'
- result = source.copy_into(target_dir)
- self.assertEqual(result, target_dir / 'fileA')
- self.assertTrue(result.exists())
- self.assertEqual(source.read_text(), result.read_text())
-
- def test_copy_into_empty_name(self):
- source = self.cls('')
- target_dir = self.base
- self.assertRaises(ValueError, source.copy_into, target_dir)
-
def test_iterdir(self):
P = self.cls
p = P(self.base)
self.assertIs((P / 'linkA\x00').is_file(), False)
-class DummyPathWalkTest(unittest.TestCase):
- cls = DummyPath
- base = DummyPathTest.base
+class DummyWritablePathTest(DummyReadablePathTest):
+ cls = DummyWritablePath
+
+ def test_read_write_bytes(self):
+ p = self.cls(self.base)
+ (p / 'fileA').write_bytes(b'abcdefg')
+ self.assertEqual((p / 'fileA').read_bytes(), b'abcdefg')
+ # Check that trying to write str does not truncate the file.
+ self.assertRaises(TypeError, (p / 'fileA').write_bytes, 'somestr')
+ self.assertEqual((p / 'fileA').read_bytes(), b'abcdefg')
+
+ def test_read_write_text(self):
+ p = self.cls(self.base)
+ (p / 'fileA').write_text('äbcdefg', encoding='latin-1')
+ self.assertEqual((p / 'fileA').read_text(
+ encoding='utf-8', errors='ignore'), 'bcdefg')
+ # Check that trying to write bytes does not truncate the file.
+ self.assertRaises(TypeError, (p / 'fileA').write_text, b'somebytes')
+ self.assertEqual((p / 'fileA').read_text(encoding='latin-1'), 'äbcdefg')
+
+ def test_read_text_with_newlines(self):
+ p = self.cls(self.base)
+ # Check that `\n` character change nothing
+ (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
+ self.assertEqual((p / 'fileA').read_text(newline='\n'),
+ 'abcde\r\nfghlk\n\rmnopq')
+ # Check that `\r` character replaces `\n`
+ (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
+ self.assertEqual((p / 'fileA').read_text(newline='\r'),
+ 'abcde\r\nfghlk\n\rmnopq')
+ # Check that `\r\n` character replaces `\n`
+ (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq')
+ self.assertEqual((p / 'fileA').read_text(newline='\r\n'),
+ 'abcde\r\nfghlk\n\rmnopq')
+
+ def test_write_text_with_newlines(self):
+ p = self.cls(self.base)
+ # Check that `\n` character change nothing
+ (p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\n')
+ self.assertEqual((p / 'fileA').read_bytes(),
+ b'abcde\r\nfghlk\n\rmnopq')
+ # Check that `\r` character replaces `\n`
+ (p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\r')
+ self.assertEqual((p / 'fileA').read_bytes(),
+ b'abcde\r\rfghlk\r\rmnopq')
+ # Check that `\r\n` character replaces `\n`
+ (p / 'fileA').write_text('abcde\r\nfghlk\n\rmnopq', newline='\r\n')
+ self.assertEqual((p / 'fileA').read_bytes(),
+ b'abcde\r\r\nfghlk\r\n\rmnopq')
+ # Check that no argument passed will change `\n` to `os.linesep`
+ os_linesep_byte = bytes(os.linesep, encoding='ascii')
+ (p / 'fileA').write_text('abcde\nfghlk\n\rmnopq')
+ self.assertEqual((p / 'fileA').read_bytes(),
+ b'abcde' + os_linesep_byte + b'fghlk' + os_linesep_byte + b'\rmnopq')
+
+ def test_copy_file(self):
+ base = self.cls(self.base)
+ source = base / 'fileA'
+ target = base / 'copyA'
+ result = source.copy(target)
+ self.assertEqual(result, target)
+ self.assertTrue(target.exists())
+ self.assertEqual(source.read_text(), target.read_text())
+
+ def test_copy_file_to_existing_file(self):
+ base = self.cls(self.base)
+ source = base / 'fileA'
+ target = base / 'dirB' / 'fileB'
+ result = source.copy(target)
+ self.assertEqual(result, target)
+ self.assertTrue(target.exists())
+ self.assertEqual(source.read_text(), target.read_text())
+
+ def test_copy_file_to_existing_directory(self):
+ base = self.cls(self.base)
+ source = base / 'fileA'
+ target = base / 'dirA'
+ self.assertRaises(OSError, source.copy, target)
+
+ def test_copy_file_empty(self):
+ base = self.cls(self.base)
+ source = base / 'empty'
+ target = base / 'copyA'
+ source.write_bytes(b'')
+ result = source.copy(target)
+ self.assertEqual(result, target)
+ self.assertTrue(target.exists())
+ self.assertEqual(target.read_bytes(), b'')
+
+ def test_copy_file_to_itself(self):
+ base = self.cls(self.base)
+ source = base / 'empty'
+ source.write_bytes(b'')
+ self.assertRaises(OSError, source.copy, source)
+ self.assertRaises(OSError, source.copy, source, follow_symlinks=False)
+
+ def test_copy_dir_simple(self):
+ base = self.cls(self.base)
+ source = base / 'dirC'
+ target = base / 'copyC'
+ result = source.copy(target)
+ self.assertEqual(result, target)
+ self.assertTrue(target.is_dir())
+ self.assertTrue(target.joinpath('dirD').is_dir())
+ self.assertTrue(target.joinpath('dirD', 'fileD').is_file())
+ self.assertEqual(target.joinpath('dirD', 'fileD').read_text(),
+ "this is file D\n")
+ self.assertTrue(target.joinpath('fileC').is_file())
+ self.assertTrue(target.joinpath('fileC').read_text(),
+ "this is file C\n")
+
+ def test_copy_dir_complex(self, follow_symlinks=True):
+ def ordered_walk(path):
+ for dirpath, dirnames, filenames in path.walk(follow_symlinks=follow_symlinks):
+ dirnames.sort()
+ filenames.sort()
+ yield dirpath, dirnames, filenames
+ base = self.cls(self.base)
+ source = base / 'dirC'
+
+ if self.can_symlink:
+ # Add some symlinks
+ source.joinpath('linkC').symlink_to('fileC')
+ source.joinpath('linkD').symlink_to('dirD', target_is_directory=True)
+
+ # Perform the copy
+ target = base / 'copyC'
+ result = source.copy(target, follow_symlinks=follow_symlinks)
+ self.assertEqual(result, target)
+
+ # Compare the source and target trees
+ source_walk = ordered_walk(source)
+ target_walk = ordered_walk(target)
+ for source_item, target_item in zip(source_walk, target_walk, strict=True):
+ self.assertEqual(source_item[0].parts[len(source.parts):],
+ target_item[0].parts[len(target.parts):]) # dirpath
+ self.assertEqual(source_item[1], target_item[1]) # dirnames
+ self.assertEqual(source_item[2], target_item[2]) # filenames
+ # Compare files and symlinks
+ for filename in source_item[2]:
+ source_file = source_item[0].joinpath(filename)
+ target_file = target_item[0].joinpath(filename)
+ if follow_symlinks or not source_file.is_symlink():
+ # Regular file.
+ self.assertEqual(source_file.read_bytes(), target_file.read_bytes())
+ elif source_file.is_dir():
+ # Symlink to directory.
+ self.assertTrue(target_file.is_dir())
+ self.assertEqual(source_file.readlink(), target_file.readlink())
+ else:
+ # Symlink to file.
+ self.assertEqual(source_file.read_bytes(), target_file.read_bytes())
+ self.assertEqual(source_file.readlink(), target_file.readlink())
+
+ def test_copy_dir_complex_follow_symlinks_false(self):
+ self.test_copy_dir_complex(follow_symlinks=False)
+
+ def test_copy_dir_to_existing_directory(self):
+ base = self.cls(self.base)
+ source = base / 'dirC'
+ target = base / 'copyC'
+ target.mkdir()
+ target.joinpath('dirD').mkdir()
+ self.assertRaises(FileExistsError, source.copy, target)
+
+ def test_copy_dir_to_existing_directory_dirs_exist_ok(self):
+ base = self.cls(self.base)
+ source = base / 'dirC'
+ target = base / 'copyC'
+ target.mkdir()
+ target.joinpath('dirD').mkdir()
+ result = source.copy(target, dirs_exist_ok=True)
+ self.assertEqual(result, target)
+ self.assertTrue(target.is_dir())
+ self.assertTrue(target.joinpath('dirD').is_dir())
+ self.assertTrue(target.joinpath('dirD', 'fileD').is_file())
+ self.assertEqual(target.joinpath('dirD', 'fileD').read_text(),
+ "this is file D\n")
+ self.assertTrue(target.joinpath('fileC').is_file())
+ self.assertTrue(target.joinpath('fileC').read_text(),
+ "this is file C\n")
+
+ def test_copy_dir_to_itself(self):
+ base = self.cls(self.base)
+ source = base / 'dirC'
+ self.assertRaises(OSError, source.copy, source)
+ self.assertRaises(OSError, source.copy, source, follow_symlinks=False)
+
+ def test_copy_dir_into_itself(self):
+ base = self.cls(self.base)
+ source = base / 'dirC'
+ target = base / 'dirC' / 'dirD' / 'copyC'
+ self.assertRaises(OSError, source.copy, target)
+ self.assertRaises(OSError, source.copy, target, follow_symlinks=False)
+ self.assertFalse(target.exists())
+
+ def test_copy_into(self):
+ base = self.cls(self.base)
+ source = base / 'fileA'
+ target_dir = base / 'dirA'
+ result = source.copy_into(target_dir)
+ self.assertEqual(result, target_dir / 'fileA')
+ self.assertTrue(result.exists())
+ self.assertEqual(source.read_text(), result.read_text())
+
+ def test_copy_into_empty_name(self):
+ source = self.cls('')
+ target_dir = self.base
+ self.assertRaises(ValueError, source.copy_into, target_dir)
+
+
+class DummyReadablePathWalkTest(unittest.TestCase):
+ cls = DummyReadablePath
+ base = DummyReadablePathTest.base
can_symlink = False
def setUp(self):