import contextlib
import ctypes, ctypes.util
import crypt
+import errno
+import fcntl
import getpass
import hashlib
import os
+import pathlib
import platform
import shutil
import stat
raise
return path
+_IOC_NRBITS = 8
+_IOC_TYPEBITS = 8
+_IOC_SIZEBITS = 14
+_IOC_DIRBITS = 2
+
+_IOC_NRSHIFT = 0
+_IOC_TYPESHIFT = _IOC_NRSHIFT + _IOC_NRBITS
+_IOC_SIZESHIFT = _IOC_TYPESHIFT + _IOC_TYPEBITS
+_IOC_DIRSHIFT = _IOC_SIZESHIFT + _IOC_SIZEBITS
+
+_IOC_NONE = 0
+_IOC_WRITE = 1
+_IOC_READ = 2
+
+def _IOC(dir, type, nr, argtype):
+ size = {'int':4, 'size_t':8}[argtype]
+ return dir<<_IOC_DIRSHIFT | type<<_IOC_TYPESHIFT | nr<<_IOC_NRSHIFT | size<<_IOC_SIZESHIFT
+def _IOW(type, nr, size):
+ return _IOC(_IOC_WRITE, type, nr, size)
+
+FICLONE = _IOW(0x94, 9, 'int')
+
+@contextlib.contextmanager
+def open_close(path, flags, mode=0o664):
+ fd = os.open(path, flags | os.O_CLOEXEC, mode)
+ try:
+ yield fd
+ finally:
+ os.close(fd)
+
+def _reflink(oldfd, newfd):
+ fcntl.ioctl(newfd, FICLONE, oldfd)
+
+def _copy_file(oldfd, newfd):
+ try:
+ _reflink(oldfd, newfd)
+ except OSError as e:
+ if e.errno not in {errno.EXDEV, errno.EOPNOTSUPP}:
+ raise
+ shutil.copyfileobj(open(oldfd, 'rb', closefd=False),
+ open(newfd, 'wb', closefd=False))
+
+def copy_file(oldpath, newpath):
+ with open_close(oldpath, os.O_RDONLY) as oldfd:
+ st = os.stat(oldfd)
+
+ try:
+ with open_close(newpath, os.O_WRONLY|os.O_CREAT, st.st_mode) as newfd:
+ _copy_file(oldfd, newfd)
+ except FileExistsError:
+ os.unlink(newpath)
+ with open_close(newpath, os.O_WRONLY|os.O_CREAT, st.st_mode) as newfd:
+ _copy_file(oldfd, newfd)
+
+def symlink_f(target, path):
+ try:
+ os.symlink(target, path)
+ except FileExistsError:
+ os.unlink(path)
+ os.symlink(target, path)
+
+def copy(oldpath, newpath):
+ if not isinstance(newpath, pathlib.Path):
+ newpath = pathlib.Path(newpath)
+
+ try:
+ mkdir_last(newpath)
+ except FileExistsError:
+ # something that is not a directory already exists
+ os.unlink(path)
+ mkdir_last(newpath)
+
+ for entry in os.scandir(oldpath):
+ newentry = newpath / entry.name
+ if entry.is_dir(follow_symlinks=False):
+ copy(entry.path, newentry)
+ elif entry.is_symlink():
+ target = os.readlink(entry.path)
+ symlink_f(target, newentry)
+ else:
+ st = entry.stat(follow_symlinks=False)
+ if stat.S_ISREG(st.st_mode):
+ copy_file(entry.path, newentry)
+ else:
+ print('Ignoring', entry.path)
+ continue
+ shutil.copystat(entry.path, newentry, follow_symlinks=False)
+
@contextlib.contextmanager
def complete_step(text, text2=None):
print_step(text + '...')
if args.distribution == Distribution.opensuse:
install_boot_loader_opensuse(args, workspace)
-def enumerate_and_copy(source, dest):
- subprocess.run(["cp", "--reflink=auto", "--recursive", "--no-dereference", "--preserve=all", "--no-target-directory", source, dest], check=True)
-
def install_extra_trees(args, workspace, for_cache):
if args.extra_trees is None:
return
with complete_step('Copying in extra file trees'):
for d in args.extra_trees:
- enumerate_and_copy(d, os.path.join(workspace, "root"))
+ copy(d, os.path.join(workspace, "root"))
def copy_git_files(src, dest, *, git_files):
subprocess.run(['git', 'clone', '--depth=1', '--recursive', '--shallow-submodules', src, dest],
directory = os.path.dirname(dest_path)
os.makedirs(directory, exist_ok=True)
- shutil.copy2(src_path, dest_path, follow_symlinks=False)
+ copy_file(src_path, dest_path)
def install_build_src(args, workspace, run_build_script, for_cache):
if not run_build_script:
return
with complete_step('Copying in build script and sources'):
- shutil.copy(args.build_script,
- os.path.join(workspace, "root", "root", os.path.basename(args.build_script)))
+ copy_file(args.build_script,
+ os.path.join(workspace, "root", "root", os.path.basename(args.build_script)))
if args.build_sources is not None:
target = os.path.join(workspace, "root", "root/src")
return
with complete_step('Copying in build tree'):
- enumerate_and_copy(os.path.join(workspace, "dest"), os.path.join(workspace, "root"))
+ copy(os.path.join(workspace, "dest"), os.path.join(workspace, "root"))
def make_read_only(args, workspace, for_cache):
if not args.read_only:
with complete_step('Copying in cached tree ' + fname):
try:
- enumerate_and_copy(fname, os.path.join(workspace, "root"))
+ copy(fname, os.path.join(workspace, "root"))
except FileNotFoundError:
return False