import multiprocessing
import os
import socket
-import tempfile
-
-from . import distro
-from . import shell
from . import _pakfire
-
+from . import distro
from .i18n import _
class System(object):
# Create an instance of this class to only keep it once in memory.
system = System()
-class Mountpoints(object):
- def __init__(self, root="/"):
- self._mountpoints = []
-
- # Scan for all mountpoints on the system.
- self._scan(root)
-
- def __iter__(self):
- return iter(self._mountpoints)
-
- def _scan(self, root):
- # Get the real path of root.
- root = os.path.realpath(root)
-
- # If root is not equal to /, we are in a chroot and
- # our root must be a mountpoint to count files.
- if not root == "/":
- mp = Mountpoint("/", root=root)
- self._mountpoints.append(mp)
-
- f = open("/proc/mounts")
-
- for line in f.readlines():
- line = line.split()
-
- # The mountpoint is the second argument.
- mountpoint = line[1]
-
- # Skip all mountpoints that are not in our root directory.
- if not mountpoint.startswith(root):
- continue
-
- mountpoint = os.path.relpath(mountpoint, root)
- if mountpoint == ".":
- mountpoint = "/"
- else:
- mountpoint = os.path.join("/", mountpoint)
-
- mp = Mountpoint(mountpoint, root=root)
-
- if not mp in self._mountpoints:
- self._mountpoints.append(mp)
-
- f.close()
-
- # Sort all mountpoints for better searching.
- self._mountpoints.sort()
-
- def add_pkg(self, pkg):
- for file in pkg.filelist:
- self.add(file)
-
- def rem_pkg(self, pkg):
- for file in pkg.filelist:
- self.rem(file)
-
- def add(self, file):
- for mp in reversed(self._mountpoints):
- # Check if the file is located on this mountpoint.
- if not file.name.startswith(mp.path):
- continue
-
- # Add file to this mountpoint.
- mp.add(file)
- break
-
- def rem(self, file):
- for mp in reversed(self._mountpoints):
- # Check if the file is located on this mountpoint.
- if not file.name.startswith(mp.path):
- continue
-
- # Remove file from this mountpoint.
- mp.rem(file)
- break
-
-
-class Mountpoint(object):
- def __init__(self, path, root="/"):
- self.path = path
- self.root = root
-
- # Cache the statvfs call of the mountpoint.
- self.__stat = None
-
- # Save the amount of data that is used or freed.
- self.disk_usage = 0
-
- def __repr__(self):
- return "<%s %s>" % (self.__class__.__name__, self.fullpath)
-
- def __cmp__(self, other):
- return cmp(self.fullpath, other.fullpath)
-
- @property
- def fullpath(self):
- path = self.path
- while path.startswith("/"):
- path = path[1:]
-
- return os.path.join(self.root, path)
-
- @property
- def stat(self):
- if self.__stat is None:
- # Find the next mountpoint, because we cannot
- # statvfs any path in the FS.
- path = os.path.realpath(self.fullpath)
-
- # Walk to root until we find a mountpoint.
- while not os.path.ismount(path):
- path = os.path.dirname(path)
-
- # See what we can get.
- self.__stat = os.statvfs(path)
-
- return self.__stat
-
- @property
- def free(self):
- return self.stat.f_bavail * self.stat.f_bsize
-
- @property
- def space_needed(self):
- if self.disk_usage > 0:
- return self.disk_usage
-
- return 0
-
- @property
- def space_left(self):
- return self.free - self.space_needed
-
- def add(self, file):
- assert file.name.startswith(self.path)
-
- # Round filesize to 4k blocks.
- block_size = 4096
-
- blocks = file.size // block_size
- if file.size % block_size:
- blocks += 1
-
- self.disk_usage += blocks * block_size
-
- def rem(self, file):
- assert file.name.startswith(self.path)
-
- self.disk_usage -= file.size
-
- def is_readonly(self):
- """
- Returns True if the mountpoint is mounted read-only.
- Otherwise False.
- """
- # Using the statvfs output does not really work, so we use
- # a very naive approach here, were we just try to create a
- # new file. If that works, it's writable.
-
- try:
- handle, path = tempfile.mkstemp(prefix="ro-test-", dir=self.fullpath)
- except OSError as e:
- # Read-only file system.
- if e.errno == 30:
- return True
-
- # Raise all other exceptions.
- raise
- else:
- # Close the file and remove it.
- os.close(handle)
- os.unlink(path)
-
- return False
-
- def remount(self, rorw=None):
- options = "remount"
- if rorw in ("ro", "rw"):
- options = "%s,%s" % (options, rorw)
-
- try:
- shellenv = shell.ShellExecuteEnvironment(
- ["mount", "-o", options, self.fullpath],
- shell=False,
- )
- shellenv.execute()
- except ShellEnvironmentError as e:
- raise OSError
-
-
if __name__ == "__main__":
print("Hostname", system.hostname)
print("Arch", system.arch)