]> git.ipfire.org Git - pbs.git/blobdiff - src/buildservice/__init__.py
backend: Move cleanup job into the main thread
[pbs.git] / src / buildservice / __init__.py
index db7de0ebc767ca5e22813aeef380d50d513dc591..862ed46b42aa615af273779870a011f73a24d523 100644 (file)
@@ -1,7 +1,9 @@
 #!/usr/bin/python
 
+import aiofiles
 import asyncio
 import configparser
+import datetime
 import inspect
 import logging
 import os
@@ -28,6 +30,7 @@ from . import logstreams
 from . import messages
 from . import mirrors
 from . import packages
+from . import ratelimiter
 from . import releasemonitoring
 from . import repository
 from . import settings
@@ -66,24 +69,25 @@ class Backend(object):
                # Initialize the HTTP Client
                self.httpclient = httpclient.HTTPClient(self)
 
-               self.aws               = aws.AWS(self)
-               self.builds            = builds.Builds(self)
-               self.cache             = cache.Cache(self)
-               self.jobs              = jobs.Jobs(self)
-               self.builders          = builders.Builders(self)
-               self.distros           = distribution.Distributions(self)
-               self.events            = events.Events(self)
-               self.keys              = keys.Keys(self)
-               self.logstreams        = logstreams.LogStreams(self)
-               self.messages          = messages.Messages(self)
-               self.mirrors           = mirrors.Mirrors(self)
-               self.packages          = packages.Packages(self)
-               self.releasemonitoring = releasemonitoring.ReleaseMonitoring(self)
-               self.repos             = repository.Repositories(self)
-               self.sessions          = sessions.Sessions(self)
-               self.sources           = sources.Sources(self)
-               self.uploads           = uploads.Uploads(self)
-               self.users             = users.Users(self)
+               self.aws         = aws.AWS(self)
+               self.builds      = builds.Builds(self)
+               self.cache       = cache.Cache(self)
+               self.jobs        = jobs.Jobs(self)
+               self.builders    = builders.Builders(self)
+               self.distros     = distribution.Distributions(self)
+               self.events      = events.Events(self)
+               self.keys        = keys.Keys(self)
+               self.logstreams  = logstreams.LogStreams(self)
+               self.messages    = messages.Messages(self)
+               self.mirrors     = mirrors.Mirrors(self)
+               self.packages    = packages.Packages(self)
+               self.monitorings = releasemonitoring.Monitorings(self)
+               self.ratelimiter = ratelimiter.RateLimiter(self)
+               self.repos       = repository.Repositories(self)
+               self.sessions    = sessions.Sessions(self)
+               self.sources     = sources.Sources(self)
+               self.uploads     = uploads.Uploads(self)
+               self.users       = users.Users(self)
 
                # Open a connection to bugzilla.
                self.bugzilla          = bugtracker.Bugzilla(self)
@@ -93,6 +97,26 @@ class Backend(object):
 
                log.info("Pakfire Build Service initialized at %s" % self.basepath)
 
+       def launch_background_tasks(self):
+               # Launch some initial tasks
+               self.run_task(self.users.generate_vapid_keys)
+               self.run_task(self.builders.autoscale)
+
+               # Regularly sync data to the mirrors
+               self.run_periodic_task(300, self.sync)
+
+               # Regularly check the mirrors
+               self.run_periodic_task(300, self.mirrors.check)
+
+               # Regularly fetch sources
+               self.run_periodic_task(300, self.sources.fetch)
+
+               # Regularly check for new releases
+               self.run_periodic_task(300, self.monitorings.check)
+
+               # Cleanup regularly
+               self.run_periodic_task(3600, self.cleanup)
+
        def read_config(self, path):
                c = configparser.ConfigParser()
 
@@ -114,7 +138,7 @@ class Backend(object):
 
                log.debug("Connecting to database %s @ %s" % (name, hostname))
 
-               return database.Connection(hostname, name, user=user, password=password)
+               return database.Connection(self, hostname, name, user=user, password=password)
 
        def _create_tmp_path(self):
                """
@@ -210,7 +234,7 @@ class Backend(object):
                """
                        Runs this shell command
                """
-               with tempfile.TemporaryDirectory() as tmp:
+               async with self.tempdir() as tmp:
                        # Create a minimal environment
                        env = {
                                "HOME"       : os.environ.get("HOME", tmp),
@@ -229,18 +253,36 @@ class Backend(object):
                        # Run the command
                        return await self._command(*command, env=env, **kwargs)
 
-       async def _command(self, *command, return_output=False, **kwargs):
+       async def _command(self, *command, return_output=False, input=None, **kwargs):
                log.debug("Running command: %s" % " ".join(command))
 
                # Fork child process
                process = await asyncio.create_subprocess_exec(
                        *command,
-                       stdin=asyncio.subprocess.DEVNULL,
+                       stdin=asyncio.subprocess.PIPE if input else asyncio.subprocess.DEVNULL,
                        stdout=asyncio.subprocess.PIPE,
                        stderr=asyncio.subprocess.PIPE,
                        **kwargs,
                )
 
+               # Send input
+               if input:
+                       # Convert to bytes
+                       if not isinstance(input, bytes):
+                               input = input.encode()
+
+                       # Write the entire data chunk by chunk
+                       while input:
+                               chunk, input = input[0:64], input[64:]
+                               if not chunk:
+                                       break
+
+                               process.stdin.write(chunk)
+                               await process.stdin.drain()
+
+                       # Close the input once we are done
+                       process.stdin.close()
+
                stdout = []
 
                # Fetch output of command and send it to the logger
@@ -304,6 +346,18 @@ class Backend(object):
                        "kinit", "-k", "-t", keytab, principal, **kwargs,
                )
 
+       async def exists(self, *args, **kwargs):
+               """
+                       Checks whether a file exists
+               """
+               return await asyncio.to_thread(os.path.exists, *args, **kwargs)
+
+       async def makedirs(self, path, **kwargs):
+               """
+                       Creates a new directory
+               """
+               return await asyncio.to_thread(os.makedirs, path, **kwargs)
+
        async def copy(self, src, dst, mode=None):
                """
                        Copies a file from src to dst
@@ -320,17 +374,25 @@ class Backend(object):
                if mode:
                        await asyncio.to_thread(os.chmod, dst, mode)
 
+       async def move(self, src, dst, **kwargs):
+               """
+                       Moves something from src to dst
+               """
+               log.debug("Moving %s to %s" % (src, dst))
+
+               # Create parent directory
+               await self.make_parent_directory(dst)
+
+               # Move!
+               await asyncio.to_thread(shutil.move, src, dst, **kwargs)
+
        async def make_parent_directory(self, path):
                """
                        Creates the parent directory of path
                """
                path = os.path.dirname(path)
 
-               # Create destination path (if it does not exist)
-               try:
-                       await asyncio.to_thread(os.makedirs, path)
-               except FileExistsError:
-                       pass
+               return await self.makedirs(path, exist_ok=True)
 
        async def unlink(self, path):
                """
@@ -360,7 +422,7 @@ class Backend(object):
                        path = os.path.dirname(path)
 
                        # Break if we reached the base path
-                       if path == self.basepath:
+                       if path == self.basepath or path == self.path("tmp"):
                                break
 
                        # Call rmdir()
@@ -371,13 +433,45 @@ class Backend(object):
 
                        log.debug("  Cleaned up %s..." % path)
 
+       async def rmtree(self, path):
+               """
+                       Removes a directory recursively
+               """
+               # Normalize the path
+               path = os.path.abspath(path)
+
+               # Check if the path is within our base directory
+               if not path.startswith(self.basepath):
+                       raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+               log.debug("Removing %s..." % path)
+
+               try:
+                       await asyncio.to_thread(shutil.rmtree, path)
+
+               # Ignore if path didn't exist in the first place
+               except FileNotFoundError:
+                       pass
+
+       async def chmod(self, *args, **kwargs):
+               return await asyncio.to_thread(os.chmod, *args, **kwargs)
+
        def tempfile(self, mode="w+b", delete=True):
                """
                        Returns an open file handle to a new temporary file
                """
                path = self.path("tmp")
 
-               return tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+               return aiofiles.tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+
+       def tempdir(self, **kwargs):
+               """
+                       Asynchronously creates a new temporary directory
+               """
+               # Default to our own tmp directory
+               path = self.path("tmp")
+
+               return aiofiles.tempfile.TemporaryDirectory(dir=path, **kwargs)
 
        def _write_tempfile(self, content):
                """
@@ -398,14 +492,11 @@ class Backend(object):
                """
                        Opens a package and returns the archive
                """
-               return await asyncio.to_thread(self._open, path)
-
-       def _open(self, path):
                log.debug("Opening %s..." % path)
 
                # Open the archive
-               with self.pakfire() as p:
-                       return p.open(path)
+               async with self.pakfire() as p:
+                       return await asyncio.to_thread(p.open, path)
 
        @property
        def ssl_context(self):
@@ -467,11 +558,14 @@ class Backend(object):
                        log.warning("No sync target configured")
                        return 0
 
+               # Update the timestamp
+               await self._update_timestamp()
+
                commandline = [
                        "rsync",
 
                        # Show what is being transferred
-                       #"--verbose",
+                       "--verbose",
 
                        # Compress any transferred data
                        "--compress",
@@ -497,6 +591,9 @@ class Backend(object):
                        # Add source & target
                        "%s/" % self.basepath,
                        target,
+
+                       # Sync the .timestamp
+                       "--include=.timestamp",
                ]
 
                # Add all mirrored repositories
@@ -511,6 +608,19 @@ class Backend(object):
                # Run command
                await self.command(*commandline, krb5_auth=True)
 
+       async def _update_timestamp(self):
+               """
+                       Updates the .timestamp file in the root of the exported files
+               """
+               # Make filename
+               path = os.path.join(self.basepath, ".timestamp")
+
+               t = datetime.datetime.utcnow()
+
+               # Write the current time as seconds since epoch
+               async with aiofiles.open(path, mode="w") as f:
+                       await f.write(t.strftime("%s"))
+
 
 def setup_logging():
        """