]> git.ipfire.org Git - pbs.git/blobdiff - src/buildservice/__init__.py
backend: Move cleanup job into the main thread
[pbs.git] / src / buildservice / __init__.py
index 6a2e8326afe1390469c5e123f61b8e00f783003a..862ed46b42aa615af273779870a011f73a24d523 100644 (file)
@@ -1,7 +1,9 @@
 #!/usr/bin/python
 
+import aiofiles
 import asyncio
 import configparser
+import datetime
 import inspect
 import logging
 import os
@@ -21,12 +23,15 @@ from . import config
 from . import database
 from . import distribution
 from . import events
-from . import jobqueue
+from . import httpclient
 from . import jobs
 from . import keys
+from . import logstreams
 from . import messages
 from . import mirrors
 from . import packages
+from . import ratelimiter
+from . import releasemonitoring
 from . import repository
 from . import settings
 from . import sessions
@@ -61,6 +66,9 @@ class Backend(object):
                # Global pakfire settings (from database).
                self.settings = settings.Settings(self)
 
+               # Initialize the HTTP Client
+               self.httpclient = httpclient.HTTPClient(self)
+
                self.aws         = aws.AWS(self)
                self.builds      = builds.Builds(self)
                self.cache       = cache.Cache(self)
@@ -68,11 +76,13 @@ class Backend(object):
                self.builders    = builders.Builders(self)
                self.distros     = distribution.Distributions(self)
                self.events      = events.Events(self)
-               self.jobqueue    = jobqueue.JobQueue(self)
                self.keys        = keys.Keys(self)
+               self.logstreams  = logstreams.LogStreams(self)
                self.messages    = messages.Messages(self)
                self.mirrors     = mirrors.Mirrors(self)
                self.packages    = packages.Packages(self)
+               self.monitorings = releasemonitoring.Monitorings(self)
+               self.ratelimiter = ratelimiter.RateLimiter(self)
                self.repos       = repository.Repositories(self)
                self.sessions    = sessions.Sessions(self)
                self.sources     = sources.Sources(self)
@@ -80,13 +90,33 @@ class Backend(object):
                self.users       = users.Users(self)
 
                # Open a connection to bugzilla.
-               self.bugzilla    = bugtracker.Bugzilla(self)
+               self.bugzilla          = bugtracker.Bugzilla(self)
 
                # Create a temporary directory
                self._create_tmp_path()
 
                log.info("Pakfire Build Service initialized at %s" % self.basepath)
 
+       def launch_background_tasks(self):
+               # Launch some initial tasks
+               self.run_task(self.users.generate_vapid_keys)
+               self.run_task(self.builders.autoscale)
+
+               # Regularly sync data to the mirrors
+               self.run_periodic_task(300, self.sync)
+
+               # Regularly check the mirrors
+               self.run_periodic_task(300, self.mirrors.check)
+
+               # Regularly fetch sources
+               self.run_periodic_task(300, self.sources.fetch)
+
+               # Regularly check for new releases
+               self.run_periodic_task(300, self.monitorings.check)
+
+               # Cleanup regularly
+               self.run_periodic_task(3600, self.cleanup)
+
        def read_config(self, path):
                c = configparser.ConfigParser()
 
@@ -108,7 +138,7 @@ class Backend(object):
 
                log.debug("Connecting to database %s @ %s" % (name, hostname))
 
-               return database.Connection(hostname, name, user=user, password=password)
+               return database.Connection(self, hostname, name, user=user, password=password)
 
        def _create_tmp_path(self):
                """
@@ -129,20 +159,26 @@ class Backend(object):
                """
                return os.path.join(self.basepath, *args)
 
-       def path_to_url(self, path):
+       def url_to(self, url):
                """
-                       Takes a path to a file on the file system and converts it into a URL
+                       Takes a relative URL and makes it absolute
                """
                # The base URL
                baseurl = self.settings.get("baseurl")
 
+               # Join it all together
+               return urllib.parse.urljoin(baseurl, url)
+
+       def path_to_url(self, path):
+               """
+                       Takes a path to a file on the file system and converts it into a URL
+               """
                # Path to package
                path = os.path.join(
                        "files", os.path.relpath(path, self.basepath),
                )
 
-               # Join it all together
-               return urllib.parse.urljoin(baseurl, path)
+               return self.url_to(path)
 
        def pakfire(self, *args, **kwargs):
                """
@@ -179,9 +215,6 @@ class Backend(object):
                log.debug("Periodic callback %r started" % callback)
 
                while True:
-                       # Wait a little moment
-                       await asyncio.sleep(delay)
-
                        try:
                                ret = callback(*args)
 
@@ -192,13 +225,16 @@ class Backend(object):
                        except Exception as e:
                                log.error("Exception in periodic callback %r" % callback, exc_info=True)
 
+                       # Wait a little moment
+                       await asyncio.sleep(delay)
+
        # Commands
 
        async def command(self, *command, krb5_auth=False, **kwargs):
                """
                        Runs this shell command
                """
-               with tempfile.TemporaryDirectory() as tmp:
+               async with self.tempdir() as tmp:
                        # Create a minimal environment
                        env = {
                                "HOME"       : os.environ.get("HOME", tmp),
@@ -217,18 +253,36 @@ class Backend(object):
                        # Run the command
                        return await self._command(*command, env=env, **kwargs)
 
-       async def _command(self, *command, return_output=False, **kwargs):
+       async def _command(self, *command, return_output=False, input=None, **kwargs):
                log.debug("Running command: %s" % " ".join(command))
 
                # Fork child process
                process = await asyncio.create_subprocess_exec(
                        *command,
-                       stdin=asyncio.subprocess.DEVNULL,
+                       stdin=asyncio.subprocess.PIPE if input else asyncio.subprocess.DEVNULL,
                        stdout=asyncio.subprocess.PIPE,
                        stderr=asyncio.subprocess.PIPE,
                        **kwargs,
                )
 
+               # Send input
+               if input:
+                       # Convert to bytes
+                       if not isinstance(input, bytes):
+                               input = input.encode()
+
+                       # Write the entire data chunk by chunk
+                       while input:
+                               chunk, input = input[0:64], input[64:]
+                               if not chunk:
+                                       break
+
+                               process.stdin.write(chunk)
+                               await process.stdin.drain()
+
+                       # Close the input once we are done
+                       process.stdin.close()
+
                stdout = []
 
                # Fetch output of command and send it to the logger
@@ -292,6 +346,18 @@ class Backend(object):
                        "kinit", "-k", "-t", keytab, principal, **kwargs,
                )
 
+       async def exists(self, *args, **kwargs):
+               """
+                       Checks whether a file exists
+               """
+               return await asyncio.to_thread(os.path.exists, *args, **kwargs)
+
+       async def makedirs(self, path, **kwargs):
+               """
+                       Creates a new directory
+               """
+               return await asyncio.to_thread(os.makedirs, path, **kwargs)
+
        async def copy(self, src, dst, mode=None):
                """
                        Copies a file from src to dst
@@ -308,17 +374,25 @@ class Backend(object):
                if mode:
                        await asyncio.to_thread(os.chmod, dst, mode)
 
+       async def move(self, src, dst, **kwargs):
+               """
+                       Moves something from src to dst
+               """
+               log.debug("Moving %s to %s" % (src, dst))
+
+               # Create parent directory
+               await self.make_parent_directory(dst)
+
+               # Move!
+               await asyncio.to_thread(shutil.move, src, dst, **kwargs)
+
        async def make_parent_directory(self, path):
                """
                        Creates the parent directory of path
                """
                path = os.path.dirname(path)
 
-               # Create destination path (if it does not exist)
-               try:
-                       await asyncio.to_thread(os.makedirs, path)
-               except FileExistsError:
-                       pass
+               return await self.makedirs(path, exist_ok=True)
 
        async def unlink(self, path):
                """
@@ -348,7 +422,7 @@ class Backend(object):
                        path = os.path.dirname(path)
 
                        # Break if we reached the base path
-                       if path == self.basepath:
+                       if path == self.basepath or path == self.path("tmp"):
                                break
 
                        # Call rmdir()
@@ -359,13 +433,45 @@ class Backend(object):
 
                        log.debug("  Cleaned up %s..." % path)
 
+       async def rmtree(self, path):
+               """
+                       Removes a directory recursively
+               """
+               # Normalize the path
+               path = os.path.abspath(path)
+
+               # Check if the path is within our base directory
+               if not path.startswith(self.basepath):
+                       raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+               log.debug("Removing %s..." % path)
+
+               try:
+                       await asyncio.to_thread(shutil.rmtree, path)
+
+               # Ignore if path didn't exist in the first place
+               except FileNotFoundError:
+                       pass
+
+       async def chmod(self, *args, **kwargs):
+               return await asyncio.to_thread(os.chmod, *args, **kwargs)
+
        def tempfile(self, mode="w+b", delete=True):
                """
                        Returns an open file handle to a new temporary file
                """
                path = self.path("tmp")
 
-               return tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+               return aiofiles.tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+
+       def tempdir(self, **kwargs):
+               """
+                       Asynchronously creates a new temporary directory
+               """
+               # Default to our own tmp directory
+               path = self.path("tmp")
+
+               return aiofiles.tempfile.TemporaryDirectory(dir=path, **kwargs)
 
        def _write_tempfile(self, content):
                """
@@ -386,14 +492,11 @@ class Backend(object):
                """
                        Opens a package and returns the archive
                """
-               return await asyncio.to_thread(self._open, path)
-
-       def _open(self, path):
                log.debug("Opening %s..." % path)
 
                # Open the archive
-               with self.pakfire() as p:
-                       return p.open(path)
+               async with self.pakfire() as p:
+                       return await asyncio.to_thread(p.open, path)
 
        @property
        def ssl_context(self):
@@ -455,11 +558,14 @@ class Backend(object):
                        log.warning("No sync target configured")
                        return 0
 
+               # Update the timestamp
+               await self._update_timestamp()
+
                commandline = [
                        "rsync",
 
                        # Show what is being transferred
-                       #"--verbose",
+                       "--verbose",
 
                        # Compress any transferred data
                        "--compress",
@@ -485,6 +591,9 @@ class Backend(object):
                        # Add source & target
                        "%s/" % self.basepath,
                        target,
+
+                       # Sync the .timestamp
+                       "--include=.timestamp",
                ]
 
                # Add all mirrored repositories
@@ -499,6 +608,19 @@ class Backend(object):
                # Run command
                await self.command(*commandline, krb5_auth=True)
 
+       async def _update_timestamp(self):
+               """
+                       Updates the .timestamp file in the root of the exported files
+               """
+               # Make filename
+               path = os.path.join(self.basepath, ".timestamp")
+
+               t = datetime.datetime.utcnow()
+
+               # Write the current time as seconds since epoch
+               async with aiofiles.open(path, mode="w") as f:
+                       await f.write(t.strftime("%s"))
+
 
 def setup_logging():
        """