From: Dmitry Selyutin Date: Thu, 30 Nov 2017 19:52:30 +0000 (+0300) Subject: vfs: standalone filesystem functions X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b05a55bb4c4d9d4cedb76f1dd54bba6bd69f4cbe;p=thirdparty%2Fgnulib.git vfs: standalone filesystem functions --- diff --git a/pygnulib/vfs.py b/pygnulib/vfs.py index 8b0c29ac48..346788c829 100644 --- a/pygnulib/vfs.py +++ b/pygnulib/vfs.py @@ -5,6 +5,7 @@ import codecs as _codecs +import filecmp as _filecmp import os as _os import shutil as _shutil import tempfile as _tempfile @@ -26,7 +27,7 @@ class Base: for (key, value) in table.items(): _type_assert(key, value, str) self.__table[key] = _os.path.normpath(value) - self.__prefix = _os.path.abspath(prefix) + self.__prefix = prefix def __repr__(self): @@ -47,7 +48,7 @@ class Base: path = _os.path.normpath(name) if _os.path.isabs(name): raise ValueError("name must be a relative path") - path = _os.path.join(self.__prefix, name) + path = _os.path.join(self.path, name) return _os.path.exists(path) @@ -66,19 +67,19 @@ class Base: part = self.__table[part] replaced = True parts += [part] - path = _os.path.sep.join([self.__prefix] + parts) + path = _os.path.sep.join([self.path] + parts) return _os.path.normpath(path) @property - def base_prefix(self): - """base path prefix""" + def name(self): + """base VFS name""" return self.__prefix @property - def full_prefix(self): - """absolute path prefix""" + def path(self): + """absolute VFS path""" return _os.path.abspath(self.__prefix) @@ -106,10 +107,10 @@ def lookup(name, primary, secondary, patch="patch"): _type_assert("primary", primary, Base) _type_assert("secondary", secondary, Base) if name in secondary: - return (secondary[name], secondary) + return (secondary, name) diff = "{}.diff".format(name) if diff not in secondary: - return (primary[name], primary) + return (primary, name) tmp = _tempfile.NamedTemporaryFile(mode="w+b", delete=False) with _codecs.open(primary[name], "rb") as stream: _shutil.copyfileobj(stream, tmp) @@ -124,109 +125,116 @@ def lookup(name, primary, secondary, patch="patch"): if returncode != 0: cmd = "patch -s {} < {}".format(tmp.name, secondary[diff]) raise _sp.CalledProcessError(returncode, cmd, stdout, stderr) - return (tmp.name, None) - - - -class Project(Base): - """project virtual file system""" - def __init__(self, name, **table): - path = _os.path.realpath(name) - if not _os.path.exists(path): - raise FileNotFoundError(path) - if not _os.path.isdir(path): - raise NotADirectoryError(path) - super().__init__(name, **table) - self.__patch = None - - - @property - def patch(self): - """path to patch binary""" - if self.__patch is None: - raise AttributeError("patch") - return self.__patch - - @patch.setter - def patch(self, path): - _type_assert("path", path, str) - if not _os.path.isabs(path): - if _shutil.which(path) is None: - raise FileNotFoundError("patch") - self.__patch = path - - - def backup(self, name): - """Backup the given file.""" - backup = "{}~".format(name) - try: - _os.unlink(self[backup]) - except FileNotFoundError: - pass # ignore non-existent files - _shutil.copy(self[name], self[backup]) - - - def copy(self, src, dst): - """Copy file data.""" - srcabs = _os.path.isabs(self, src) - dstabs = _os.path.isabs(self, dst) - if srcabs and dstabs: - raise ValueError("absolute src and dst") - limit = (16 * 1024) - src = src if srcabs else _os.path.join(self.full_prefix, src) - dst = dst if dstabs else _os.path.join(self.full_prefix, dst) - with _codecs.open(src, "rb") as istream: - with _codecs.open(dst, "wb") as ostream: - while 1: - data = istream.read(limit) - if not data: - break - ostream.write(data) - - - def hardlink(self, src, dst): - """Create a hard link to the file.""" - srcabs = _os.path.isabs(self, src) - dstabs = _os.path.isabs(self, dst) - if srcabs and dstabs: - raise ValueError("absolute src and dst") - src = src if srcabs else _os.path.join(self.full_prefix, src) - dst = dst if dstabs else _os.path.join(self.full_prefix, dst) - _os.link(src, dst) - - - def mkdir(self, name): - """Create a leaf directory and all intermediate ones recursively.""" - _os.makedirs(self[name], exist_ok=True) - - - def move(self, src, dst): - """Move file data.""" - srcabs = _os.path.isabs(self, src) - dstabs = _os.path.isabs(self, dst) - if srcabs and dstabs: - raise ValueError("absolute src and dst") - src = src if srcabs else _os.path.join(self.full_prefix, src) - dst = dst if dstabs else _os.path.join(self.full_prefix, dst) - _os.rename(src, dst) - - - def symlink(self, src, dst): - """Create a symbolic link to the file.""" - srcabs = _os.path.isabs(self, src) - dstabs = _os.path.isabs(self, dst) - if srcabs and dstabs: - raise ValueError("absolute src and dst") - src = src if srcabs else _os.path.join(self.full_prefix, src) - dst = dst if dstabs else _os.path.join(self.full_prefix, dst) - _os.symlink(src, dst) - - - def unlink(self, name, backup=True): - """Unlink a file, backing it up if necessary.""" - if backup: - self.backup(name) - _os.unlink(self[name]) + return (None, tmp.name) + + +def mkdir(root, name): + """Create a leaf directory and all intermediate ones recursively.""" + root = Base(".") if root is None else root + _os.makedirs(root[name], exist_ok=True) + + +def backup(root, name): + """Backup the given file.""" + root = Base(".") if root is None else root + path = _os.path.join(root.path, root[name]) + backup = "{}~".format(path) + try: + _os.unlink(backup) + except FileNotFoundError: + pass # ignore non-existent files + _os.rename(path, backup) + + +def compare(lhs_root, lhs_name, rhs_root, rhs_name): + """Compare the given files; return True if files contain the same data.""" + lhs_root = Base(".") if lhs_root is None else lhs_root + rhs_root = Base(".") if rhs_root is None else rhs_root + lhs_path = _os.path.join(lhs_root.path, lhs_root[lhs_name]) + rhs_path = _os.path.join(rhs_root.path, rhs_root[rhs_name]) + return _filecmp.cmp(lhs_path, rhs_path, shallow=False) + + +def copy(src_root, src_name, dst_root, dst_name): + """Copy file data.""" + src_abs = _os.path.isabs(src_name) + dst_abs = _os.path.isabs(dst_name) + if src_abs and dst_abs: + raise ValueError("absolute src and dst") + limit = (16 * 1024) + src_root = Base(".") if src_root is None else src_root + dst_root = Base(".") if dst_root is None else dst_root + mkdir(src_root, _os.path.dirname(src_name)) + mkdir(dst_root, _os.path.dirname(dst_name)) + src_path = _os.path.join(src_root.path, src_root[src_name]) + dst_path = _os.path.join(dst_root.path, dst_root[dst_name]) + with _codecs.open(src_path, "rb") as istream: + with _codecs.open(dst_path, "wb") as ostream: + while 1: + data = istream.read(limit) + if not data: + break + ostream.write(data) + + +def exists(root, name): + """Check whether the given file exists.""" + root = Base(".") if root is None else root + path = _os.path.join(root.path, root[name]) + return _os.path.exists(path) + + +def hardlink(src_root, src_name, dst_root, dst_name): + """Create a hard link to the file.""" + src_abs = _os.path.isabs(src_name) + dst_abs = _os.path.isabs(dst_name) + if src_abs and dst_abs: + raise ValueError("absolute src and dst") + src_root = Base(".") if src_root is None else src_root + dst_root = Base(".") if dst_root is None else dst_root + mkdir(src_root, _os.path.dirname(src_name)) + mkdir(dst_root, _os.path.dirname(dst_name)) + src_path = _os.path.join(src_root.path, src_root[src_name]) + dst_path = _os.path.join(dst_root.path, dst_root[dst_name]) + _os.link(src_path, dst_path) + + +def move(src_root, src_name, dst_root, dst_name): + """Move file data.""" + src_abs = _os.path.isabs(src_name) + dst_abs = _os.path.isabs(dst_name) + if src_abs and dst_abs: + raise ValueError("absolute src and dst") + src_root = Base(".") if src_root is None else src_root + dst_root = Base(".") if dst_root is None else dst_root + mkdir(src_root, _os.path.dirname(src_name)) + mkdir(dst_root, _os.path.dirname(dst_name)) + src_path = _os.path.join(src_root.path, src_root[src_name]) + dst_path = _os.path.join(dst_root.path, dst_root[dst_name]) + _os.rename(src_path, dst_path) + + +def symlink(src_root, src_name, dst_root, dst_name): + """Create a symbolic link to the file.""" + src_abs = _os.path.isabs(src_name) + dst_abs = _os.path.isabs(dst_name) + if src_abs and dst_abs: + raise ValueError("absolute src and dst") + src_root = Base(".") if src_root is None else src_root + dst_root = Base(".") if dst_root is None else dst_root + mkdir(src_root, _os.path.dirname(src_name)) + mkdir(dst_root, _os.path.dirname(dst_name)) + src_path = _os.path.join(src_root.path, src_root[src_name]) + dst_path = _os.path.join(dst_root.path, dst_root[dst_name]) + _os.symlink(src_path, dst_path) + + +def unlink(root, name, backup=True): + """Unlink a file, backing it up if necessary.""" + root = Base(".") if root is None else root + mkdir(root, _os.path.dirname(name)) + path = _os.path.join(root.path, root[name]) + _os.unlink(path) @@ -247,11 +255,11 @@ class GnulibGit(Base): def __init__(self, prefix, **table): super().__init__(prefix, **table) - if not _os.path.exists(self.full_prefix): - raise FileNotFoundError(self.full_prefix) - if not _os.path.isdir(self.full_prefix): - raise NotADirectoryError(self.full_prefix) - if not _os.path.isdir(_os.path.join(self.full_prefix, ".git")): + if not _os.path.exists(self.path): + raise FileNotFoundError(self.path) + if not _os.path.isdir(self.path): + raise NotADirectoryError(self.path) + if not _os.path.isdir(_os.path.join(self.path, ".git")): raise TypeError("{} is not a gnulib repository".format(prefix))