]> git.ipfire.org Git - pbs.git/commitdiff
Try to make this entire application async
authorMichael Tremer <michael.tremer@ipfire.org>
Sun, 12 Jan 2025 11:51:21 +0000 (11:51 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sun, 12 Jan 2025 11:51:21 +0000 (11:51 +0000)
We will need this so that we won't run out of database file descriptors
as psycopg3 currently can only use select() which does not support a
file descriptor to have a number over 1024.

We will also need this to scale this application better. However,
Tornado has some limitations in its template engine which is why we will
need to replace the template engine with Jinja.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
28 files changed:
src/buildservice/__init__.py
src/buildservice/base.py
src/buildservice/builders.py
src/buildservice/builds.py
src/buildservice/database.py
src/buildservice/distribution.py
src/buildservice/events.py
src/buildservice/jobs.py
src/buildservice/misc.py
src/buildservice/packages.py
src/buildservice/releasemonitoring.py
src/buildservice/repository.py
src/buildservice/sessions.py
src/buildservice/settings.py
src/buildservice/sources.py
src/buildservice/uploads.py
src/buildservice/users.py
src/crontab/pakfire-build-service
src/templates/distros/show.html
src/templates/repos/modules/list.html
src/web/auth.py
src/web/base.py
src/web/builders.py
src/web/builds.py
src/web/distributions.py
src/web/handlers.py
src/web/jobs.py
src/web/packages.py

index 197f5d102564cf8eeb97edbd842c493ba8eff011..93cdd1ba87569d31df31bc6ae2bb12ccd75044c0 100644 (file)
@@ -112,7 +112,8 @@ class Backend(object):
                self.run_periodic_task(300, self.sources.fetch)
 
                # Regularly check for new releases
-               self.run_periodic_task(300, self.monitorings.check)
+               # XXX Disabled for now
+               #self.run_periodic_task(300, self.monitorings.check)
 
                # Cleanup regularly
                self.run_periodic_task(3600, self.cleanup)
@@ -600,7 +601,7 @@ class Backend(object):
                ]
 
                # Add all mirrored repositories
-               for repo in self.repos.mirrored:
+               async for repo in await self.repos.mirrored:
                        path = os.path.relpath(repo.local_path(), self.basepath)
 
                        commandline.append("--include=%s***" % path)
index c42478da00857043ea5aab02e9c4113fb064fa71..ffe8b7166f93866e3ce24ff3f16d3fd5d62fd927 100644 (file)
@@ -48,43 +48,39 @@ class DataObject(Object):
        def __hash__(self):
                return hash(self.id)
 
-       def init(self, id, data=None, **kwargs):
-               self.id = id
-
-               if data:
-                       self.data = data
+       def init(self, data=None, **kwargs):
+               self.data = data
 
                # Set any extra arguments (to populate the cache)
                for arg in kwargs:
                        setattr(self, arg, kwargs[arg])
 
-       @lazy_property
-       def data(self):
-               assert self.table, "Table name is not set"
-               assert self.id
+       @property
+       def id(self):
+               return self.data.id
 
-               return self.db.get("SELECT * FROM %s \
-                       WHERE id = %%s" % self.table, self.id)
+       async def _set_attribute(self, key, val):
+               assert self.table, "Table name not set"
+               assert self.id
 
-       def _set_attribute(self, key, val):
                # Detect if an update is needed
                if self.data[key] == val:
                        return
 
-               self.db.execute("UPDATE %s SET %s = %%s \
+               await self.db.execute("UPDATE %s SET %s = %%s \
                        WHERE id = %%s" % (self.table, key), val, self.id)
 
                # Update the cached attribute
                self.data[key] = val
 
-       def _set_attribute_now(self, key):
-               """
-                       Sets the given key to CURRENT_TIMESTAMP
-               """
-               res = self.db.get("UPDATE %s SET %s = CURRENT_TIMESTAMP \
+       async def _set_attribute_now(self, key):
+               assert self.table, "Table name not set"
+               assert self.id
+
+               res = await self.db.execute("UPDATE %s SET %s = CURRENT_TIMESTAMP \
                        WHERE id = %%s RETURNING %s" % (self.table, key, key), self.id)
 
-               # Update cached attribute
+               # Update the cached attribute
                if res:
                        self.data[key] = res[key]
 
index c51c3a04ae65c54c7ceb0cbd444db8ad0ea601c2..b6049477cf5f0d4fff6d324177be933b91682006 100644 (file)
@@ -17,33 +17,27 @@ class Builders(base.Object):
        # Stores any control connections to builders
        connections = {}
 
-       def _get_builder(self, query, *args):
-               res = self.db.get(query, *args)
+       async def _get_builders(self, *args, **kwargs):
+               return await self.db.fetch_many(Builder, *args, **kwargs)
 
-               if res:
-                       return Builder(self.backend, res.id, data=res)
-
-       def _get_builders(self, query, *args):
-               res = self.db.query(query, *args)
-
-               for row in res:
-                       yield Builder(self.backend, row.id, data=row)
+       async def _get_builder(self, *args, **kwargs):
+               return await self.db.fetch_one(Builder, *args, **kwargs)
 
-       def __iter__(self):
-               builders = self._get_builders("SELECT * FROM builders \
+       async def __aiter__(self):
+               builders = await self._get_builders("SELECT * FROM builders \
                        WHERE deleted_at IS NULL ORDER BY name")
 
-               return iter(builders)
+               return aiter(builders)
 
        def init(self):
                # Initialize stats
                self.stats = BuildersStats(self.backend)
 
-       def create(self, name, user=None):
+       async def create(self, name, user=None):
                """
                        Creates a new builder.
                """
-               builder = self._get_builder("""
+               builder = await self._get_builder("""
                        INSERT INTO
                                builders
                        (
@@ -61,8 +55,8 @@ class Builders(base.Object):
 
                return builder
 
-       def get_by_id(self, id):
-               return self._get_builder("""
+       async def get_by_id(self, id):
+               return await self._get_builder("""
                        SELECT
                                *
                        FROM
@@ -72,8 +66,8 @@ class Builders(base.Object):
                        """, id,
                )
 
-       def get_by_name(self, name):
-               return self._get_builder("""
+       async def get_by_name(self, name):
+               return await self._get_builder("""
                        SELECT
                                *
                        FROM
@@ -117,7 +111,7 @@ class Builders(base.Object):
                threshold = datetime.timedelta(minutes=5)
 
                # Fetch all builders
-               builders = [b for b in self]
+               builders = [b async for b in self]
 
                # Fetch the priority for each builder
                builders = dict(
@@ -338,7 +332,7 @@ class Builder(base.DataObject):
                        Logs some stats about this builder
                """
                # Update information
-               self.db.execute("""
+               await self.db.execute("""
                        UPDATE
                                builders
                        SET
@@ -361,7 +355,7 @@ class Builder(base.DataObject):
                )
 
                # Log Stats
-               stats = self._get_stats("""
+               stats = await self._get_stats("""
                        INSERT INTO
                                builder_stats
                        (
index f7688cc8a1ef6e0b3443468516a455676ae146c4..964d151f0a8217f9083aaa959969250f8f984b50 100644 (file)
@@ -17,12 +17,12 @@ from .decorators import *
 log = logging.getLogger("pbs.builds")
 
 class Builds(base.Object):
-       def _get_build(self, query, *args, **kwargs):
-               return self.db.fetch_one(Build, query, *args, **kwargs)
-
        def _get_builds(self, query, *args, **kwargs):
                return self.db.fetch_many(Build, query, *args, **kwargs)
 
+       async def _get_build(self, query, *args, **kwargs):
+               return await self.db.fetch_one(Build, query, *args, **kwargs)
+
        def __len__(self):
                res = self.db.get("""
                        SELECT
@@ -38,8 +38,8 @@ class Builds(base.Object):
 
                return res.builds
 
-       def get_by_id(self, id):
-               return self._get_build("""
+       async def get_by_id(self, id):
+               return await self._get_build("""
                        SELECT
                                *
                        FROM
@@ -49,8 +49,8 @@ class Builds(base.Object):
                        """, id,
                )
 
-       def get_by_uuid(self, uuid):
-               return self._get_build("""
+       async def get_by_uuid(self, uuid):
+               return await self._get_build("""
                        SELECT
                                *
                        FROM
@@ -89,7 +89,7 @@ class Builds(base.Object):
                """
                        Returns all builds by this name
                """
-               builds = self._get_builds("""
+               return self._get_builds("""
                        SELECT
                                builds.*
                        FROM
@@ -107,10 +107,8 @@ class Builds(base.Object):
                        """, name,
                )
 
-               return list(builds)
-
        def get_release_builds_by_name(self, name, limit=None):
-               builds = self._get_builds("""
+               return self._get_builds("""
                        WITH builds AS (
                                SELECT
                                        builds.*,
@@ -143,10 +141,8 @@ class Builds(base.Object):
                        """, name, limit,
                )
 
-               return list(builds)
-
        def get_scratch_builds_by_name(self, name, limit=None):
-               builds = self._get_builds("""
+               return self._get_builds("""
                        WITH builds AS (
                                SELECT
                                        builds.*,
@@ -181,8 +177,6 @@ class Builds(base.Object):
                        """, name, limit,
                )
 
-               return list(builds)
-
        def get_recent(self, name=None, limit=None, offset=None):
                """
                        Returns the most recent (non-test) builds
@@ -190,7 +184,7 @@ class Builds(base.Object):
                if name:
                        return self.get_recent_by_name(name, limit=limit, offset=offset)
 
-               builds = self._get_builds("""
+               return self._get_builds("""
                        SELECT
                                *
                        FROM
@@ -208,13 +202,11 @@ class Builds(base.Object):
                        limit, offset,
                )
 
-               return list(builds)
-
        def get_recent_by_name(self, name, limit=None, offset=None):
                """
                        Returns the most recent (non-test) builds
                """
-               builds = self._get_builds("""
+               return self._get_builds("""
                        SELECT
                                builds.*
                        FROM
@@ -238,13 +230,11 @@ class Builds(base.Object):
                        name, limit, offset,
                )
 
-               return list(builds)
-
        def get_by_package_uuids(self, uuids):
                """
                        Returns a list of builds that contain the given packages
                """
-               builds = self._get_builds("""
+               return self._get_builds("""
                        SELECT
                                DISTINCT builds.*
                        FROM
@@ -274,8 +264,6 @@ class Builds(base.Object):
                        """, uuids, uuids,
                )
 
-               return list(builds)
-
        async def create(self, repo, package, owner=None, group=None, test=False,
                        disable_test_builds=False, timeout=None):
                """
@@ -293,7 +281,7 @@ class Builds(base.Object):
                if timeout is None:
                        timeout = datetime.timedelta(hours=3)
 
-               build = self._get_build("""
+               build = await self._get_build("""
                        INSERT INTO
                                builds
                        (
@@ -325,14 +313,14 @@ class Builds(base.Object):
                        group.builds.append(build)
 
                # Create all jobs
-               build._create_jobs(timeout=timeout)
+               await build._create_jobs(timeout=timeout)
 
                if not build.is_test():
                        # Deprecate previous builds
-                       build._deprecate_others()
+                       await build._deprecate_others()
 
                        # Add watchers
-                       build._add_watchers()
+                       await build._add_watchers()
 
                        # Add the build into its repository
                        await repo.add_build(build, user=owner)
@@ -433,7 +421,7 @@ class Build(base.DataObject):
                # Mark as deleted
                self._set_attribute_now("deleted_at")
                if user:
-                       self._set_attribute("deleted_by", user)
+                       await self._set_attribute("deleted_by", user)
 
                # Delete source package
                await self.pkg.delete(user=user)
@@ -467,6 +455,8 @@ class Build(base.DataObject):
        def finished_at(self):
                return self.data.finished_at
 
+       # Owner
+
        def get_owner(self):
                """
                        The owner of this build.
@@ -474,10 +464,8 @@ class Build(base.DataObject):
                if self.data.owner_id:
                        return self.backend.users.get_by_id(self.data.owner_id)
 
-       def set_owner(self, owner):
-               self._set_attribute("owner_id", owner)
-
-       owner = lazy_property(get_owner, set_owner)
+       async def set_owner(self, owner):
+               await self._set_attribute("owner_id", owner)
 
        # Build Repository
 
@@ -502,13 +490,13 @@ class Build(base.DataObject):
        def is_broken(self):
                return self.state == "broken"
 
-       def set_severity(self, severity):
-               self._set_attribute("severity", severity)
+       # Severity
 
        def get_severity(self):
                return self.data.severity
 
-       severity = property(get_severity, set_severity)
+       async def set_severity(self, severity):
+               await self._set_attribute("severity", severity)
 
        @lazy_property
        def commit(self):
@@ -581,13 +569,8 @@ class Build(base.DataObject):
 
        # Jobs
 
-       def _get_jobs(self, query, *args):
-               ret = []
-               for job in self.backend.jobs._get_jobs(query, *args):
-                       job.build = self
-                       ret.append(job)
-
-               return ret
+       async def _get_jobs(self, *args, **kwargs):
+               return self.backend.jobs._get_jobs(*args, build=self, **kwargs)
 
        @property
        def jobs(self):
@@ -672,12 +655,12 @@ class Build(base.DataObject):
 
        # Points
 
-       def add_points(self, points, user=None):
+       async def add_points(self, points, user=None):
                """
                        Add points (can be negative)
                """
                # Log points
-               self.db.execute("""
+               await self.db.execute("""
                        INSERT INTO
                                build_points
                        (
@@ -693,7 +676,7 @@ class Build(base.DataObject):
                )
 
                # Update the cache
-               self._set_attribute("points", self.points + points)
+               await self._set_attribute("points", self.points + points)
 
        @property
        def points(self):
@@ -705,8 +688,8 @@ class Build(base.DataObject):
        ## Watchers
 
        @lazy_property
-       def watchers(self):
-               users = self.backend.users._get_users("""
+       async def watchers(self):
+               users = await self.backend.users._get_users("""
                        SELECT
                                users.*
                        FROM
@@ -724,11 +707,11 @@ class Build(base.DataObject):
 
                return set(users)
 
-       def add_watcher(self, user):
+       async def add_watcher(self, user):
                """
                        Adds a watcher to this build
                """
-               self.db.execute("""
+               await self.db.execute("""
                        INSERT INTO
                                build_watchers(
                                        build_id,
@@ -746,11 +729,11 @@ class Build(base.DataObject):
                # Add to cache
                self.watchers.add(user)
 
-       def remove_watcher(self, user):
+       async def remove_watcher(self, user):
                """
                        Removes a watcher from this build
                """
-               self.db.execute("""
+               await self.db.execute("""
                        UPDATE
                                build_watchers
                        SET
@@ -770,7 +753,7 @@ class Build(base.DataObject):
                except KeyError:
                        pass
 
-       def _add_watchers(self):
+       async def _add_watchers(self):
                """
                        Called when a new build is created and automatically adds any watchers
                """
@@ -797,15 +780,15 @@ class Build(base.DataObject):
                        log.error("Build %s has failed" % self)
 
                # Mark as finished
-               self._set_attribute_now("finished_at")
+               await self._set_attribute_now("finished_at")
 
                # Mark as failed if the build was not successful
                if not success:
-                       self._set_attribute("failed", True)
+                       await self._set_attribute("failed", True)
 
                # Award some negative points on failure
                if not success:
-                       self.add_points(-1)
+                       await self.add_points(-1)
 
                # Notify everyone this build has finished...
                if success:
@@ -859,16 +842,16 @@ class Build(base.DataObject):
                                continue
 
                        # Send an email to the user
-                       user.send_email(*args, build=self, **kwargs)
+                       await user.send_email(*args, build=self, **kwargs)
 
        # Repositories
 
        @lazy_property
-       def repos(self):
+       async def repos(self):
                """
                        Return a list of all repositories this package is in
                """
-               repos = self.backend.repos._get_repositories("""
+               repos = await self.backend.repos._get_repositories("""
                        SELECT
                                repositories.*
                        FROM
@@ -931,8 +914,8 @@ class Build(base.DataObject):
        ## Bugs
 
        @lazy_property
-       def bug_ids(self):
-               rows = self.db.query("""
+       async def bug_ids(self):
+               rows = await self.db.query("""
                        SELECT
                                bug_id
                        FROM
@@ -948,8 +931,8 @@ class Build(base.DataObject):
 
                return set([row.bug_id for row in rows])
 
-       def add_bug(self, bug_id, user=None):
-               self.db.execute("""
+       async def add_bug(self, bug_id, user=None):
+               await self.db.execute("""
                        INSERT INTO
                                build_bugs
                        (
@@ -975,8 +958,8 @@ class Build(base.DataObject):
                # Add to cache
                self.bug_ids.add(bug_id)
 
-       def remove_bug(self, bug_id, user):
-               self.db.execute("""
+       async def remove_bug(self, bug_id, user):
+               await self.db.execute("""
                        UPDATE
                                build_bugs
                        SET
@@ -1004,11 +987,11 @@ class Build(base.DataObject):
        # Monitoring Release
 
        @lazy_property
-       def monitoring_release(self):
+       async def monitoring_release(self):
                """
                        Returns the Monitoring Release
                """
-               return self.backend.monitorings._get_release("""
+               return await self.backend.monitorings._get_release("""
                        SELECT
                                *
                        FROM
@@ -1023,13 +1006,13 @@ class Build(base.DataObject):
 
        # Deprecation
 
-       def _deprecate_others(self):
+       async def _deprecate_others(self):
                """
                        Called when this build is being created.
 
                        This method will find similar builds and automatically deprecate them.
                """
-               builds = self.backend.builds._get_builds("""
+               builds = await self.backend.builds._get_builds("""
                        SELECT
                                builds.*
 
@@ -1079,9 +1062,9 @@ class Build(base.DataObject):
 
                # Deprecate all builds
                for build in builds:
-                       build.deprecate(build=self)
+                       await build.deprecate(build=self)
 
-       def deprecate(self, build=None, user=None):
+       async def deprecate(self, build=None, user=None):
                """
                        Called when a build needs to be deprecated
                """
@@ -1095,9 +1078,9 @@ class Build(base.DataObject):
                        user = build.owner
 
                # Mark as deprecated
-               self._set_attribute_now("deprecated_at")
+               await self._set_attribute_now("deprecated_at")
                if user:
-                       self._set_attribute("deprecated_by", user)
+                       await self._set_attribute("deprecated_by", user)
 
                # Store the build
                if build:
@@ -1114,25 +1097,25 @@ class Build(base.DataObject):
                return False
 
        @lazy_property
-       def deprecated_by(self):
+       async def deprecated_by(self):
                if self.data.deprecated_by:
-                       return self.backend.users.get_by_id(self.data.deprecated_by)
+                       return await self.backend.users.get_by_id(self.data.deprecated_by)
 
-       def get_deprecating_build(self):
-               if self.data.deprecating_build_id:
-                       return self.backend.builds.get_by_id(self.data.deprecating_build_id)
+       # Deprecating Build
 
-       def set_deprecating_build(self, build):
-               self._set_attribute("deprecating_build_id", build)
+       async def get_deprecating_build(self):
+               if self.data.deprecating_build_id:
+                       return await self.backend.builds.get_by_id(self.data.deprecating_build_id)
 
-       deprecating_build = lazy_property(get_deprecating_build, set_deprecating_build)
+       async def set_deprecating_build(self, build):
+               await self._set_attribute("deprecating_build_id", build)
 
        @lazy_property
-       def deprecated_builds(self):
+       async def deprecated_builds(self):
                """
                        Returns a list of builds that were deprecated by this build
                """
-               builds = self.backend.builds._get_builds("""
+               builds = await self.backend.builds._get_builds("""
                        SELECT
                                *
                        FROM
@@ -1173,8 +1156,8 @@ class Build(base.DataObject):
                return False
 
        @lazy_property
-       def test_build_for(self):
-               return self.backend.builds._get_build("""
+       async def test_build_for(self):
+               return await self.backend.builds._get_build("""
                        WITH build_test_builds AS (
                                SELECT
                                        builds.id AS build_id,
@@ -1524,20 +1507,14 @@ class Group(base.DataObject):
 
 
 class Comments(base.Object):
-       def _get_comments(self, query, *args):
-               res = self.db.query(query, *args)
-
-               for row in res:
-                       yield Comment(self.backend, row.id, data=row)
+       def _get_comments(self, *args, **kwargs):
+               return self.db.fetch_many(Comment, *args, **kwargs)
 
-       def _get_comment(self, query, *args):
-               res = self.db.get(query, *args)
-
-               if res:
-                       return Comment(self.backend, res.id, data=res)
+       def _get_comment(self, *args, **kwargs):
+               return self.db.fetch_one(Comment, *args, **kwargs)
 
-       def get_by_id(self, id):
-               return self._get_comment("""
+       async def get_by_id(self, id):
+               return await self._get_comment("""
                        SELECT
                                *
                        FROM
index bd5b19f3dd87e5f2190ed0aac099f2514cd6f68d..3feaa0cdf2c592fa8fd3910be2a832dadbfddb85 100644 (file)
@@ -9,7 +9,6 @@
 """
 
 import asyncio
-import itertools
 import logging
 import psycopg
 import psycopg_pool
@@ -44,29 +43,29 @@ class Connection(object):
                self.__connections = {}
 
                # Create a connection pool
-               self.pool = psycopg_pool.ConnectionPool(
+               self.pool = psycopg_pool.AsyncConnectionPool(
                        "postgresql://%s:%s@%s/%s" % (user, password, host, database),
 
                        # Callback to configure any new connections
                        configure=self.__configure,
 
                        # Set limits for min/max connections in the pool
-                       min_size=8,
-                       max_size=1024,
+                       min_size=4,
+                       max_size=32,
 
                        # Give clients up to one minute to retrieve a connection
                        timeout=60,
 
-                       # Close connections after they have been idle for 5 seconds
-                       max_idle=5,
+                       # Close connections after they have been idle for a few minutes
+                       max_idle=120,
                )
 
-       def __configure(self, conn):
+       async def __configure(self, conn):
                """
                        Configures any newly opened connections
                """
                # Enable autocommit
-               conn.autocommit = True
+               await conn.set_autocommit(True)
 
                # Return any rows as dicts
                conn.row_factory = psycopg.rows.dict_row
@@ -74,7 +73,7 @@ class Connection(object):
                # Automatically convert DataObjects
                conn.adapters.register_dumper(base.DataObject, base.DataObjectDumper)
 
-       def connection(self, *args, **kwargs):
+       async def connection(self, *args, **kwargs):
                """
                        Returns a connection from the pool
                """
@@ -90,16 +89,24 @@ class Connection(object):
                        pass
 
                # Fetch a new connection from the pool
-               conn = self.__connections[task] = self.pool.getconn(*args, **kwargs)
+               conn = self.__connections[task] = await self.pool.getconn(*args, **kwargs)
 
                log.debug("Assigning database connection %s to %s" % (conn, task))
 
                # When the task finishes, release the connection
-               task.add_done_callback(self.__release_connection)
+               task.add_done_callback(self.release_connection)
 
                return conn
 
-       def __release_connection(self, task):
+       def release_connection(self, task):
+               """
+                       Called when a task that requested a connection has finished.
+
+                       This method will schedule that the connection is being returned into the pool.
+               """
+               self.backend.run_task(self.__release_connection, task)
+
+       async def __release_connection(self, task):
                # Retrieve the connection
                try:
                        conn = self.__connections[task]
@@ -112,9 +119,9 @@ class Connection(object):
                del self.__connections[task]
 
                # Return the connection back into the pool
-               self.pool.putconn(conn)
+               await self.pool.putconn(conn)
 
-       def _execute(self, cursor, execute, query, parameters):
+       async def _execute(self, cursor, execute, query, parameters):
                # Store the time we started this query
                t = time.monotonic()
 
@@ -124,7 +131,7 @@ class Connection(object):
                        pass
 
                # Execute the query
-               execute(query, parameters)
+               await execute(query, parameters)
 
                # How long did this take?
                elapsed = time.monotonic() - t
@@ -132,73 +139,76 @@ class Connection(object):
                # Log the query time
                log.debug("  Query time: %.2fms" % (elapsed * 1000))
 
-       def query(self, query, *parameters, **kwparameters):
+       async def query(self, query, *parameters, **kwparameters):
                """
                        Returns a row list for the given query and parameters.
                """
-               conn = self.connection()
+               conn = await self.connection()
 
-               with conn.cursor() as cursor:
-                       self._execute(cursor, cursor.execute, query, parameters or kwparameters)
+               async with conn.cursor() as cursor:
+                       await self._execute(cursor, cursor.execute, query, parameters or kwparameters)
 
-                       return [Row(row) for row in cursor]
+                       async for row in cursor:
+                               yield Row(row)
 
-       def get(self, query, *parameters, **kwparameters):
+       async def get(self, query, *parameters, **kwparameters):
                """
                        Returns the first row returned for the given query.
                """
-               rows = self.query(query, *parameters, **kwparameters)
-               if not rows:
-                       return None
-               elif len(rows) > 1:
-                       raise Exception("Multiple rows returned for Database.get() query")
-               else:
-                       return rows[0]
+               rows = []
+
+               async for row in self.query(query, *parameters, **kwparameters):
+                       rows.append(row)
+
+                       if len(rows) > 1:
+                               raise Exception("Multiple rows returned for Database.get() query")
+
+               return rows[0] if rows else None
 
-       def execute(self, query, *parameters, **kwparameters):
+       async def execute(self, query, *parameters, **kwparameters):
                """
                        Executes the given query.
                """
-               conn = self.connection()
+               conn = await self.connection()
 
-               with conn.cursor() as cursor:
-                       self._execute(cursor, cursor.execute, query, parameters or kwparameters)
+               async with conn.cursor() as cursor:
+                       await self._execute(cursor, cursor.execute, query, parameters or kwparameters)
 
-       def executemany(self, query, parameters):
+       async def executemany(self, query, parameters):
                """
                        Executes the given query against all the given param sequences.
                """
-               conn = self.connection()
+               conn = await self.connection()
 
-               with conn.cursor() as cursor:
-                       self._execute(cursor, cursor.executemany, query, parameters)
+               async with conn.cursor() as cursor:
+                       await self._execute(cursor, cursor.executemany, query, parameters)
 
-       def transaction(self):
+       async def transaction(self):
                """
                        Creates a new transaction on the current tasks' connection
                """
-               conn = self.connection()
+               conn = await self.connection()
 
                return conn.transaction()
 
-       def fetch_one(self, cls, query, *args, **kwargs):
+       async def fetch_one(self, cls, query, *args, **kwargs):
                """
                        Takes a class and a query and will return one object of that class
                """
                # Execute the query
-               res = self.get(query, *args)
+               res = await self.get(query, *args)
 
                # Return an object (if possible)
                if res:
-                       return cls(self.backend, res.id, res, **kwargs)
+                       return cls(self.backend, data=res, **kwargs)
 
-       def fetch_many(self, cls, query, *args, **kwargs):
+       async def fetch_many(self, cls, query, *args, **kwargs):
                # Execute the query
                res = self.query(query, *args)
 
                # Return a generator with objects
-               for row in res:
-                       yield cls(self.backend, row.id, row, **kwargs)
+               async for row in res:
+                       yield cls(self.backend, data=row, **kwargs)
 
 
 class Row(dict):
index b97dd9c13f17053317d1d69b0cc0f2611f8a30ef..6037b7033be731e56e73ce7b42e5b26104ab5852 100644 (file)
@@ -14,10 +14,10 @@ class Distributions(base.Object):
        def _get_distributions(self, query, *args):
                return self.db.fetch_many(Distribution, query, *args)
 
-       def _get_distribution(self, query, *args):
-               return self.db.fetch_one(Distribution, query, *args)
+       async def _get_distribution(self, query, *args):
+               return await self.db.fetch_one(Distribution, query, *args)
 
-       def __iter__(self):
+       def __aiter__(self):
                distros = self._get_distributions("""
                        SELECT
                                *
@@ -30,10 +30,10 @@ class Distributions(base.Object):
                        """,
                )
 
-               return iter(distros)
+               return aiter(distros)
 
-       def get_by_id(self, distro_id):
-               return self._get_distribution("""
+       async def get_by_id(self, distro_id):
+               return await self._get_distribution("""
                        SELECT
                                *
                        FROM
@@ -43,8 +43,8 @@ class Distributions(base.Object):
                        """, distro_id,
                )
 
-       def get_by_slug(self, slug):
-               return self._get_distribution("""
+       async def get_by_slug(self, slug):
+               return await self._get_distribution("""
                        SELECT
                                *
                        FROM
@@ -56,8 +56,8 @@ class Distributions(base.Object):
                        """, slug,
                )
 
-       def get_by_tag(self, tag):
-               return self._get_distribution("""
+       async def get_by_tag(self, tag):
+               return await self._get_distribution("""
                        SELECT
                                *
                        FROM
@@ -69,9 +69,9 @@ class Distributions(base.Object):
                        """, tag,
                )
 
-       def create(self, name, distro_id, version_id):
+       async def create(self, name, distro_id, version_id):
                # Insert into the database
-               return self._get_distribution("""
+               return await self._get_distribution("""
                        INSERT INTO
                                distributions
                        (
@@ -281,9 +281,8 @@ class Distribution(base.DataObject):
                # Must be admin
                return user.is_admin()
 
-       @lazy_property
-       def repos(self):
-               repos = self.backend.repos._get_repositories("""
+       def get_repos(self):
+               return self.backend.repos._get_repositories("""
                        SELECT
                                *
                        FROM
@@ -295,9 +294,10 @@ class Distribution(base.DataObject):
                        AND
                                owner_id IS NULL""",
                        self.id,
-               )
 
-               return sorted(repos)
+                       # Populate cache
+                       distro=self,
+               )
 
        def get_repo(self, slug):
                repo = self.backend.repos._get_repository("""
@@ -315,11 +315,10 @@ class Distribution(base.DataObject):
                                slug = %s""",
                        self.id,
                        slug,
-               )
 
-               # Cache
-               if repo:
-                       repo.distro = self
+                       # Populate cache
+                       distro=self,
+               )
 
                return repo
 
@@ -339,7 +338,7 @@ class Distribution(base.DataObject):
                """
                        Returns all release builds that match the name
                """
-               builds = self.backend.builds._get_builds("""
+               return self.backend.builds._get_builds("""
                        SELECT
                                builds.*
                        FROM
@@ -363,13 +362,10 @@ class Distribution(base.DataObject):
                        """, self.id, name, limit,
                )
 
-               return builds
-
        # Sources
 
-       @lazy_property
-       def sources(self):
-               sources = self.backend.sources._get_sources("""
+       def get_sources(self):
+               return self.backend.sources._get_sources("""
                        SELECT
                                sources.*
                        FROM
@@ -383,12 +379,10 @@ class Distribution(base.DataObject):
                        """, self.id,
                )
 
-               return list(sources)
-
        # Releases
 
        def get_releases(self, limit=None, offset=None):
-               releases = self.backend.distros.releases._get_releases("""
+               return self.backend.distros.releases._get_releases("""
                        SELECT
                                *
                        FROM
@@ -409,8 +403,6 @@ class Distribution(base.DataObject):
                        distro=self,
                )
 
-               return list(releases)
-
        def get_release(self, slug):
                return self.backend.distros.releases._get_release("""
                        SELECT
@@ -426,12 +418,11 @@ class Distribution(base.DataObject):
                        """, self.id, slug,
                )
 
-       @lazy_property
-       def latest_release(self):
+       async def get_latest_release(self):
                """
                        Returns the latest and published release
                """
-               return self.backend.distros.releases._get_release("""
+               return await self.backend.distros.releases._get_release("""
                        SELECT
                                *
                        FROM
@@ -456,11 +447,11 @@ class Releases(base.Object):
        def _get_releases(self, query, *args, **kwargs):
                return self.db.fetch_many(Release, query, *args, **kwargs)
 
-       def _get_release(self, query, *args, **kwargs):
-               return self.db.fetch_one(Release, query, *args, **kwargs)
+       async def _get_release(self, query, *args, **kwargs):
+               return await self.db.fetch_one(Release, query, *args, **kwargs)
 
-       def get_by_id(self, id):
-               return self._get_release("""
+       async def get_by_id(self, id):
+               return await self._get_release("""
                        SELECT
                                *
                        FROM
@@ -470,14 +461,14 @@ class Releases(base.Object):
                        """, id,
                )
 
-       def create(self, distro, name, user, stable=False):
+       async def create(self, distro, name, user, stable=False):
                """
                        Creates a new release
                """
                # Create a slug
                slug = misc.normalize(name)
 
-               release = self._get_release("""
+               release = await self._get_release("""
                        INSERT INTO
                                releases
                        (
@@ -656,8 +647,8 @@ class Images(base.Object):
        def _get_images(self, query, *args, **kwargs):
                return self.db.fetch_many(Image, query, *args, **kwargs)
 
-       def _get_image(self, query, *args, **kwargs):
-               return self.db.fetch_one(Image, query, *args, **kwargs)
+       async def _get_image(self, query, *args, **kwargs):
+               return await self.db.fetch_one(Image, query, *args, **kwargs)
 
 
 class Image(base.DataObject):
index 886ea4db1fb8bd39e024b2de7ad2f2cbea489e6e..ee06e5a849d6b971dfa1c3359c571f4c6fb222d6 100644 (file)
@@ -911,8 +911,8 @@ class Events(base.Object):
        def map(self):
                return {
                        # Builds
-                       "build"    : self.backend.builds.get_by_id,
-                       "by_build" : self.backend.builds.get_by_id,
+                       "build"         : self.backend.builds.get_by_id,
+                       "by_build"      : self.backend.builds.get_by_id,
 
                        # Build Comments
                        "build_comment" : self.backend.builds.comments.get_by_id,
@@ -930,17 +930,17 @@ class Events(base.Object):
                        "release"       : self.backend.distros.releases.get_by_id,
 
                        # Repositories
-                       "repository" : self.backend.repos.get_by_id,
+                       "repository"    : self.backend.repos.get_by_id,
 
                        # Builders
                        "builder"       : self.backend.builders.get_by_id,
 
                        # Users
-                       "user"    : self.backend.users.get_by_id,
-                       "by_user" : self.backend.users.get_by_id,
+                       "user"          : self.backend.users.get_by_id,
+                       "by_user"       : self.backend.users.get_by_id,
                }
 
-       def expand(self, events):
+       async def expand(self, events):
                """
                        Expands any log events
                """
@@ -968,7 +968,7 @@ class Events(base.Object):
 
                                # Call the expand function on cache miss
                                except KeyError:
-                                       value = expand(key)
+                                       value = await expand(key)
 
                                        # Store the expanded value
                                        try:
@@ -981,7 +981,7 @@ class Events(base.Object):
 
                        yield Event(self.backend, event)
 
-       def __call__(self, priority=None, offset=None, limit=None,
+       async def __call__(self, priority=None, offset=None, limit=None,
                        build=None, builder=None, mirror=None, user=None):
                """
                        Returns all events filtered by the given criteria
@@ -1015,7 +1015,7 @@ class Events(base.Object):
                        values.append(priority)
 
                # Fetch all events
-               events = self.db.query(
+               events = await self.db.query(
                        """
                        %s
 
@@ -1039,7 +1039,7 @@ class Events(base.Object):
                )
 
                # Expand all events
-               return self.expand(events)
+               return await self.expand(events)
 
 
 class Event(base.Object):
index b2bb65816cf3cb676a485cae3f29bd06aafb6edd..c1af558bd1aad628d02b558f546b1a6c6ec1037e 100644 (file)
@@ -78,14 +78,14 @@ class Jobs(base.Object):
                # Setup queue
                self.queue = Queue(self.backend)
 
-       def _get_job(self, query, *args, **kwargs):
-               return self.db.fetch_one(Job, query, *args, **kwargs)
+       def _get_jobs(self, *args, **kwargs):
+               return self.db.fetch_many(Job, *args, **kwargs)
 
-       def _get_jobs(self, query, *args, **kwargs):
-               return self.db.fetch_many(Job, query, *args, **kwargs)
+       async def _get_job(self, *args, **kwargs):
+               return await self.db.fetch_one(Job, *args, **kwargs)
 
-       def create(self, build, arch, superseeds=None, timeout=None):
-               job = self._get_job("""
+       async def create(self, build, arch, superseeds=None, timeout=None):
+               job = await self._get_job("""
                        INSERT INTO
                                jobs
                        (
@@ -112,11 +112,11 @@ class Jobs(base.Object):
 
                return job
 
-       def get_by_id(self, id):
-               return self._get_job("SELECT * FROM jobs WHERE id = %s", id)
+       async def get_by_id(self, id):
+               return await self._get_job("SELECT * FROM jobs WHERE id = %s", id)
 
-       def get_by_uuid(self, uuid):
-               return self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid)
+       async def get_by_uuid(self, uuid):
+               return await self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid)
 
        def get_finished(self, failed_only=False, limit=None, offset=None):
                """
@@ -160,11 +160,10 @@ class Jobs(base.Object):
                                        %s
                        """, limit, offset)
 
-               return list(jobs)
+               return jobs
 
-       @property
-       def running(self):
-               jobs = self._get_jobs("""
+       def get_running(self):
+               return self._get_jobs("""
                        SELECT
                                jobs.*
                        FROM
@@ -179,8 +178,6 @@ class Jobs(base.Object):
                                started_at DESC
                """)
 
-               return list(jobs)
-
        async def launch(self, jobs):
                """
                        Called to launch all given jobs
@@ -220,7 +217,7 @@ class Jobs(base.Object):
                """)
 
                # Abort them all...
-               for job in jobs:
+               async for job in jobs:
                        await job.abort()
 
 
@@ -228,13 +225,13 @@ class Queue(base.Object):
        # Locked when the queue is being processed
        lock = asyncio.Lock()
 
-       def __iter__(self):
-               jobs = self.get_jobs()
+       async def __aiter__(self):
+               jobs = await self.get_jobs()
 
-               return iter(jobs)
+               return aiter(jobs)
 
-       def __len__(self):
-               res = self.db.get("""
+       async def get_length(self):
+               res = await self.db.get("""
                        WITH %s
 
                        SELECT
@@ -248,8 +245,8 @@ class Queue(base.Object):
 
                return 0
 
-       def get_jobs(self, limit=None):
-               jobs = self.backend.jobs._get_jobs("""
+       async def get_jobs(self, limit=None):
+               jobs = await self.backend.jobs._get_jobs("""
                        WITH %s
 
                        SELECT
@@ -263,11 +260,11 @@ class Queue(base.Object):
 
                return list(jobs)
 
-       def get_jobs_for_builder(self, builder, limit=None):
+       async def get_jobs_for_builder(self, builder, limit=None):
                """
                        Returns all jobs that the given builder can process.
                """
-               return self.backend.jobs._get_jobs("""
+               return await self.backend.jobs._get_jobs("""
                        WITH %s
 
                        SELECT
@@ -410,8 +407,8 @@ class Job(base.DataObject):
        # Packages
 
        @lazy_property
-       def packages(self):
-               packages = self.backend.packages._get_packages("""
+       async def packages(self):
+               packages = await self.backend.packages._get_packages("""
                        SELECT
                                packages.*
                        FROM
index dec4dbc562aff2f35cfdb23de6cbf7eaa32c6507..544274463885759aed5653ce745d93e5b70b19eb 100644 (file)
@@ -54,13 +54,13 @@ def format_size(s):
 
                s /= 1024
 
-def group(items, key):
+async def group(items, key):
        """
                This function takes some iterable and returns it grouped by key.
        """
        result = {}
 
-       for item in items:
+       async for item in items:
                if callable(key):
                        value = key(item)
                else:
index d0d714081a1575a498392d57c1c13149212edf89..3f24edb4b8ee1fefca65bc38b11e11bc028f3921 100644 (file)
@@ -22,17 +22,11 @@ from .errors import *
 log = logging.getLogger("pbs.packages")
 
 class Packages(base.Object):
-       def _get_package(self, query, *args):
-               res = self.db.get(query, *args)
+       async def _get_packages(self, *args, **kwargs):
+               return await self.db.fetch_many(Package, *args, **kwargs)
 
-               if res:
-                       return Package(self.backend, res.id, data=res)
-
-       def _get_packages(self, query, *args):
-               res = self.db.query(query, *args)
-
-               for row in res:
-                       yield Package(self.backend, row.id, data=row)
+       async def _get_package(self, *args, **kwargs):
+               return await self.db.fetch_one(Package, *args, **kwargs)
 
        def get_list(self):
                """
@@ -61,12 +55,11 @@ class Packages(base.Object):
                        """, "src",
                )
 
-       def get_by_id(self, pkg_id):
-               return self._get_package("SELECT * FROM packages \
-                       WHERE id = %s", pkg_id)
+       async def get_by_id(self, id):
+               return await self._get_package("SELECT * FROM packages WHERE id = %s", id)
 
-       def get_by_uuid(self, uuid):
-               return self._get_package("""
+       async def get_by_uuid(self, uuid):
+               return await self._get_package("""
                        SELECT
                                *
                        FROM
@@ -86,7 +79,7 @@ class Packages(base.Object):
                path = buildid_to_path(buildid)
 
                # Search for the package containing this file
-               for package in self.search_by_filename(path, limit=1):
+               async for package in self.search_by_filename(path, limit=1):
                        log.debug("Found debuginfo for %s in %s" % (buildid, package))
 
                        return package
@@ -105,7 +98,7 @@ class Packages(base.Object):
                package = await asyncio.to_thread(archive.get_package)
 
                # Check if a package with this UUID exists
-               pkg = self.get_by_uuid(package.uuid)
+               pkg = await self.get_by_uuid(package.uuid)
                if pkg:
                        return pkg
 
@@ -113,13 +106,14 @@ class Packages(base.Object):
 
                # Find a matching distribution
                if not distro:
-                       distro = self.backend.distros.get_by_tag(package.distribution)
+                       distro = await self.backend.distros.get_by_tag(package.distribution)
                        if not distro:
                                log.error("Could not find distribution '%s'" % package.distribution)
 
                                raise NoSuchDistroError(package.distribution)
 
-               pkg = self._get_package("""
+               # Insert into database
+               pkg = await self._get_package("""
                        INSERT INTO
                                packages
                        (
@@ -192,13 +186,13 @@ class Packages(base.Object):
 
                return pkg
 
-       def search(self, q, limit=None):
+       async def search(self, q, limit=None):
                """
                        Searches for packages that do match the query.
 
                        This function does not work for UUIDs or filenames.
                """
-               packages = self._get_packages("""
+               packages = await self._get_packages("""
                        WITH package_search_index AS (
                                -- Source packages
                                SELECT
@@ -270,11 +264,11 @@ class Packages(base.Object):
 
                return list(packages)
 
-       def search_by_filename(self, filename, limit=None):
+       async def search_by_filename(self, filename, limit=None):
                if "*" in filename:
                        filename = filename.replace("*", "%")
 
-                       packages = self._get_packages("""
+                       packages = await self._get_packages("""
                                SELECT
                                        DISTINCT ON (packages.name)
                                        packages.*
@@ -293,7 +287,7 @@ class Packages(base.Object):
                        )
 
                else:
-                       packages = self._get_packages("""
+                       packages = await self._get_packages("""
                                SELECT
                                        DISTINCT ON (packages.name)
                                        packages.*
@@ -338,16 +332,16 @@ class Package(base.DataObject):
                log.info("Deleting package %s" % self)
 
                # Mark as deleted
-               self._set_attribute_now("deleted_at")
+               await self._set_attribute_now("deleted_at")
                if user:
-                       self._set_attribute("deleted_by", user)
+                       await self._set_attribute("deleted_by", user)
 
                # Unlink the payload
                if self.path:
                        await self.backend.unlink(self.path)
 
                # Reset path
-               self._set_attribute("path", None)
+               await self._set_attribute("path", None)
 
        def can_be_deleted(self):
                """
@@ -509,7 +503,7 @@ class Package(base.DataObject):
                log.debug("Importing %s to %s..." % (self, path))
 
                # Store the path
-               self._set_attribute("path", path)
+               await self._set_attribute("path", path)
 
                # Copy the file if it doesn't exist, yet
                if not await self.backend.exists(path):
@@ -521,7 +515,7 @@ class Package(base.DataObject):
                # Fetch the filelist
                filelist = await asyncio.to_thread(lambda a: a.filelist, archive)
 
-               self.db.executemany("""
+               await self.db.executemany("""
                        INSERT INTO
                                package_files
                        (
@@ -567,8 +561,8 @@ class Package(base.DataObject):
                )
 
        @lazy_property
-       def builds(self):
-               builds = self.backend.builds._get_builds("""
+       async def builds(self):
+               builds = await self.backend.builds._get_builds("""
                        SELECT
                                *
                        FROM
@@ -592,21 +586,15 @@ class Package(base.DataObject):
 
        # Files
 
-       def _get_files(self, query, *args):
-               res = self.db.query(query, *args)
-
-               for row in res:
-                       yield File(self.backend, package=self, data=row)
+       def _get_files(self, *args, **kwargs):
+               return self.db.fetch_many(File, *args, package=self, **kwargs)
 
-       def _get_file(self, query, *args):
-               res = self.db.get(query, *args)
-
-               if res:
-                       return File(self.backend, package=self, data=res)
+       async def _get_file(self, *args, **kwargs):
+               return await self.db.fetch_one(File, *args, package=self, **kwargs)
 
        @lazy_property
-       def files(self):
-               files = self._get_files("""
+       async def files(self):
+               return self._get_files("""
                        SELECT
                                *
                        FROM
@@ -618,10 +606,8 @@ class Package(base.DataObject):
                        """, self.id,
                )
 
-               return list(files)
-
-       def get_file(self, path):
-               return self._get_file("""
+       async def get_file(self, path):
+               return await self._get_file("""
                        SELECT
                                *
                        FROM
@@ -633,10 +619,10 @@ class Package(base.DataObject):
                        """, self.id, path,
                )
 
-       def get_debuginfo(self, buildid):
+       async def get_debuginfo(self, buildid):
                path = buildid_to_path(buildid)
 
-               return self.get_file(path)
+               return await self.get_file(path)
 
        # Open
 
@@ -648,9 +634,8 @@ class Package(base.DataObject):
 
 
 class File(base.Object):
-       def init(self, package, data):
-               self.package = package
-               self.data = data
+       def init(self, data, package):
+               self.data, self.package = data, package
 
        def __str__(self):
                return self.path
index 6cf05d7ce8648e830dbaea724c9934f7256ce715..72dfeeb3e461d89a9c588175e40cf8e14e8c9d30 100644 (file)
@@ -78,16 +78,15 @@ class BuildExistsError(Exception):
 class Monitorings(base.Object):
        baseurl = "https://release-monitoring.org"
 
-       @property
-       def api_key(self):
-               return self.settings.get("release-monitoring-api-key")
-
        async def _request(self, method, url, data=None):
                body = {}
 
+               # Fetch the API key
+               api_key = await self.settings.get("release-monitoring-api-key")
+
                # Authenticate to the API
                headers = {
-                       "Authorization" : "Token %s" % self.api_key,
+                       "Authorization" : "Token %s" % api_key,
                }
 
                # Compose the url
@@ -120,14 +119,14 @@ class Monitorings(base.Object):
 
                return body
 
-       def _get_monitoring(self, query, *args, **kwargs):
-               return self.db.fetch_one(Monitoring, query, *args, **kwargs)
-
        def _get_monitorings(self, query, *args, **kwargs):
                return self.db.fetch_many(Monitoring, query, *args, **kwargs)
 
-       def get_by_id(self, id):
-               return self._get_monitoring("""
+       async def _get_monitoring(self, query, *args, **kwargs):
+               return await self.db.fetch_one(Monitoring, query, *args, **kwargs)
+
+       async def get_by_id(self, id):
+               return await self._get_monitoring("""
                        SELECT
                                *
                        FROM
@@ -137,8 +136,8 @@ class Monitorings(base.Object):
                        """, id,
                )
 
-       def get_by_distro_and_name(self, distro, name):
-               return self._get_monitoring("""
+       async def get_by_distro_and_name(self, distro, name):
+               return await self._get_monitoring("""
                        SELECT
                                *
                        FROM
@@ -154,7 +153,7 @@ class Monitorings(base.Object):
 
        async def create(self, distro, name, created_by, project_id,
                        follow="stable", create_builds=True):
-               monitoring = self._get_monitoring("""
+               monitoring = await self._get_monitoring("""
                        INSERT INTO
                                release_monitorings
                        (
@@ -171,6 +170,8 @@ class Monitorings(base.Object):
                        RETURNING
                                *
                        """, distro, name, created_by, project_id, follow, create_builds,
+
+                       # Populate cache
                        distro=distro,
                )
 
@@ -216,7 +217,7 @@ class Monitorings(base.Object):
                        """, limit,
                )
 
-               for monitoring in monitorings:
+               async for monitoring in monitorings:
                        await monitoring.check()
 
        # Releases
@@ -224,8 +225,8 @@ class Monitorings(base.Object):
        def _get_releases(self, query, *args, **kwargs):
                return self.db.fetch_many(Release, query, *args, **kwargs)
 
-       def _get_release(self, query, *args, **kwargs):
-               return self.db.fetch_one(Release, query, *args, **kwargs)
+       async def _get_release(self, query, *args, **kwargs):
+               return await self.db.fetch_one(Release, query, *args, **kwargs)
 
 
 class Monitoring(base.DataObject):
@@ -239,11 +240,11 @@ class Monitoring(base.DataObject):
                return "/distros/%s/monitorings/%s" % (self.distro.slug, self.name)
 
        @lazy_property
-       def distro(self):
+       async def distro(self):
                """
                        The distribution
                """
-               return self.backend.distros.get_by_id(self.data.distro_id)
+               return await self.backend.distros.get_by_id(self.data.distro_id)
 
        @property
        def name(self):
@@ -290,9 +291,9 @@ class Monitoring(base.DataObject):
 
        async def delete(self, user=None):
                # Mark as deleted
-               self._set_attribute_now("deleted_at")
+               await self._set_attribute_now("deleted_at")
                if user:
-                       self._set_attribute("deleted_by", user)
+                       await self._set_attribute("deleted_by", user)
 
                # Delete all releases
                async with asyncio.TaskGroup() as tasks:
@@ -309,9 +310,15 @@ class Monitoring(base.DataObject):
                # Fetch the current versions
                versions = await self._fetch_versions()
 
-               with self.db.transaction():
+               # Fetch the latest release
+               # XXX ???
+
+               # Fetch the latest build
+               latest_build = await self.get_latest_build()
+
+               async with await self.db.transaction():
                        # Store timestamp of this check
-                       self._set_attribute_now("last_check_at")
+                       await self._set_attribute_now("last_check_at")
 
                        try:
                                if self.follow == "latest":
@@ -319,7 +326,7 @@ class Monitoring(base.DataObject):
                                elif self.follow == "stable":
                                        release = await self._follow_stable(versions)
                                elif self.follow == "current-branch":
-                                       release = await self._follow_current_branch(versions)
+                                       release = await self._follow_current_branch(versions, latest_build)
                                else:
                                        raise ValueError("Cannot handle follow: %s" % self.follow)
 
@@ -349,34 +356,34 @@ class Monitoring(base.DataObject):
                # Parse the response as JSON and return it
                return database.Row(response)
 
-       async def _follow_stable(self, versions):
+       async def _follow_stable(self, versions, *, build):
                """
                        This will follow "stable" i.e. the latest stable version
                """
                for version in versions.stable_versions:
-                       return await self.create_release(version)
+                       return await self.create_release(version, build=build)
 
-       async def _follow_latest(self, versions):
+       async def _follow_latest(self, versions, * build):
                """
                        This will follow the latest version (including pre-releases)
                """
-               return await self.create_release(versions.latest_version)
+               return await self.create_release(versions.latest_version, build=build)
 
-       async def _follow_current_branch(self, versions):
+       async def _follow_current_branch(self, versions, *, build):
                """
                        This will follow any minor releases in the same branch
                """
                # We cannot perform this if there is no recent build
-               if not self.latest_build:
+               if not build:
                        return
 
                # Find the next version
                next_version = self._find_next_version(
-                       self.latest_build.pkg.evr, versions.stable_versions)
+                       latest_build.pkg.evr, versions.stable_versions)
 
                # Create a new release with the next version
                if next_version:
-                       return await self.create_release(next_version)
+                       return await self.create_release(next_version, build=build)
 
        def _find_next_version(self, current_version, available_versions):
                # Remove epoch
@@ -429,16 +436,16 @@ class Monitoring(base.DataObject):
                return self.backend.monitorings._get_releases(query, *args,
                        monitoring=self, **kwargs)
 
-       def _get_release(self, query, *args, **kwargs):
-               return self.backend.monitorings._get_release(query, *args,
+       async def _get_release(self, query, *args, **kwargs):
+               return await self.backend.monitorings._get_release(query, *args,
                        monitoring=self, **kwargs)
 
        @property
-       def latest_release(self):
+       async def latest_release(self):
                """
                        Returns the latest release of this package
                """
-               return self._get_release("""
+               return await self._get_release("""
                        SELECT
                                *
                        FROM
@@ -453,7 +460,7 @@ class Monitoring(base.DataObject):
 
        @lazy_property
        def releases(self):
-               releases = self._get_releases("""
+               return self._get_releases("""
                        SELECT
                                *
                        FROM
@@ -467,20 +474,20 @@ class Monitoring(base.DataObject):
 
                return list(releases)
 
-       def _release_exists(self, version):
+       async def _release_exists(self, version):
                """
                        Returns True if this version already exists
                """
-               return version in [release.version for release in self.releases]
+               return version in [release.version async for release in self.releases]
 
-       async def create_release(self, version):
+       async def create_release(self, version, *, build):
                """
                        Creates a new release for this package
                """
                # XXX Do we need to check whether we are going backwards?
 
                # Raise an error if the release already exists
-               if self._release_exists(version):
+               if await self._release_exists(version):
                        raise ReleaseExistsError(version)
 
                # Raise an error if we already have a newer build
@@ -489,7 +496,7 @@ class Monitoring(base.DataObject):
 
                log.info("%s: Creating new release %s" % (self, version))
 
-               release = self._get_release("""
+               release = await self._get_release("""
                        INSERT INTO
                                release_monitoring_releases
                        (
@@ -511,8 +518,7 @@ class Monitoring(base.DataObject):
                # Create a build
                if self.data.create_builds:
                        await release._create_build(
-                               build=self.latest_build,
-                               owner=self.backend.users.pakfire,
+                               build=build, owner=self.backend.users.pakfire,
                        )
 
                # Create a bug report
@@ -537,11 +543,10 @@ class Monitoring(base.DataObject):
 
                return False
 
-       @lazy_property
-       def latest_build(self):
-               builds = self.distro.get_builds_by_name(self.name, limit=1)
+       async def get_latest_build(self):
+               distro = await self.distro
 
-               for build in builds:
+               async for build in distro.get_builds_by_name(self.name, limit=1):
                        return build
 
 
@@ -629,11 +634,11 @@ class Release(base.DataObject):
                                        {
                                                "name"    : self.monitoring.name,
                                                "version" : self.version,
-                                               "url"     : self.backend.url_to(self.build.url),
+                                               "url"     : await self.backend.url_to(self.build.url),
                                        },
 
                                # Set the URL to point to the build
-                               "url" : self.backend.url_to(self.build.url),
+                               "url" : await self.backend.url_to(self.build.url),
                        }
 
                # Create the bug
index 61a7de172cca278ec2f8ffb2181e96ebcb241fd5..105a551bb3954fddfbca4ceed31bba5697051ccb 100644 (file)
@@ -20,14 +20,14 @@ from .decorators import *
 log = logging.getLogger("pbs.repositories")
 
 class Repositories(base.Object):
-       def _get_repositories(self, query, *args, **kwargs):
-               return self.db.fetch_many(Repository, query, *args, **kwargs)
+       def _get_repositories(self, *args, **kwargs):
+               return self.db.fetch_many(Repository, *args, **kwargs)
 
-       def _get_repository(self, query, *args, **kwargs):
-               return self.db.fetch_one(Repository, query, *args, **kwargs)
+       async def _get_repository(self, *args, **kwargs):
+               return await self.db.fetch_one(Repository, *args, **kwargs)
 
-       def __iter__(self):
-               repositories = self._get_repositories("""
+       async def __aiter__(self):
+               repositories = await self._get_repositories("""
                        SELECT
                                *
                        FROM
@@ -39,14 +39,14 @@ class Repositories(base.Object):
                        """,
                )
 
-               return iter(repositories)
+               return aiter(repositories)
 
        @property
-       def mirrored(self):
+       async def mirrored(self):
                """
                        Lists all repositories that should be mirrored
                """
-               repos = self._get_repositories("""
+               repos = await self._get_repositories("""
                        SELECT
                                *
                        FROM
@@ -65,7 +65,7 @@ class Repositories(base.Object):
                        Creates a new repository
                """
                # Generate a slug
-               slug = self._make_slug(name, owner=owner)
+               slug = await self._make_slug(name, owner=owner)
 
                # Generate a comment for the key
                comment = "%s - %s" % (distro, name)
@@ -75,7 +75,7 @@ class Repositories(base.Object):
                # Create a key for this repository
                key = await self.backend.keys.create(comment=comment, user=owner)
 
-               repo = self._get_repository("""
+               repo = await self._get_repository("""
                        INSERT INTO
                                repositories
                        (
@@ -101,17 +101,17 @@ class Repositories(base.Object):
 
                return repo
 
-       def _make_slug(self, name, owner=None):
+       async def _make_slug(self, name, owner=None):
                for i in misc.infinity():
                        slug = misc.normalize(name, iteration=i)
 
-                       exists = self.db.get("SELECT 1 FROM repositories \
+                       exists = await self.db.get("SELECT 1 FROM repositories \
                                WHERE deleted_at IS NULL AND owner_id = %s AND slug = %s", owner, slug)
                        if not exists:
                                return slug
 
-       def get_by_id(self, repo_id):
-               return self._get_repository("SELECT * FROM repositories \
+       async def get_by_id(self, repo_id):
+               return await self._get_repository("SELECT * FROM repositories \
                        WHERE id = %s", repo_id)
 
        async def write(self):
@@ -539,7 +539,7 @@ class Repository(base.DataObject):
                return list(builds)
 
        def get_recent_builds(self, limit=None, offset=None):
-               builds = self.backend.builds._get_builds("""
+               return self.backend.builds._get_builds("""
                        SELECT
                                builds.*
                        FROM
@@ -561,10 +561,8 @@ class Repository(base.DataObject):
                        """, self.id, limit, offset,
                )
 
-               return list(builds)
-
-       def get_added_at_for_build(self, build):
-               res = self.db.get("""
+       async def get_added_at_for_build(self, build):
+               res = await self.db.get("""
                        SELECT
                                added_at
                        FROM
@@ -601,11 +599,11 @@ class Repository(base.DataObject):
 
                return res.count or 0
 
-       def get_builds_by_name(self, name):
+       async def get_builds_by_name(self, name):
                """
                        Returns an ordered list of all builds that match this name
                """
-               builds = self.backend.builds._get_builds("""
+               builds = await self.backend.builds._get_builds("""
                        SELECT
                                builds.*
                        FROM
@@ -627,9 +625,9 @@ class Repository(base.DataObject):
 
                return list(builds)
 
-       def get_packages(self, arch):
+       async def get_packages(self, arch):
                if arch == "src":
-                       packages = self.backend.packages._get_packages("""
+                       packages = await self.backend.packages._get_packages("""
                                SELECT
                                        packages.*
                                FROM
@@ -649,7 +647,7 @@ class Repository(base.DataObject):
                                """, self.id,
                        )
                else:
-                       packages = self.backend.packages._get_packages("""
+                       packages = await self.backend.packages._get_packages("""
                                SELECT
                                        packages.*
                                FROM
@@ -681,12 +679,12 @@ class Repository(base.DataObject):
                return list(packages)
 
        @property
-       def pending_jobs(self):
+       async def pending_jobs(self):
                """
                        Returns a list of all pending jobs that use this repository
                        as their build repository.
                """
-               return self.backend.jobs._get_jobs("""
+               return await self.backend.jobs._get_jobs("""
                        SELECT
                                jobs.*
                        FROM
@@ -717,8 +715,8 @@ class Repository(base.DataObject):
        # Stats
 
        @lazy_property
-       def size(self):
-               res = self.db.query("""
+       async def size(self):
+               res = await self.db.query("""
                        SELECT
                                packages.arch AS arch,
                                SUM(packages.filesize) AS size
@@ -750,8 +748,8 @@ class Repository(base.DataObject):
                return { row.arch : row.size for row in res if row.arch in self.distro.arches }
 
        @lazy_property
-       def total_size(self):
-               res = self.db.get("""
+       async def total_size(self):
+               res = await self.db.get("""
                        WITH packages AS (
                                -- Source Packages
                                SELECT
@@ -1010,16 +1008,16 @@ class Repository(base.DataObject):
                        Deletes this repository
                """
                # Mark as deleted
-               self._set_attribute_now("deleted_at")
+               await self._set_attribute_now("deleted_at")
                if user:
-                       self._set_attribute("deleted_by", user)
+                       await self._set_attribute("deleted_by", user)
 
                # Delete the key
                if self.key:
-                       self.key.delete()
+                       await self.key.delete()
 
                # Remove all builds from this repository
-               self.db.execute("""
+               await self.db.execute("""
                        UPDATE
                                repository_builds
                        SET
index 0f4ec1be2d28ce3b37feec6f019a308a49c982e5..68eba69484516d5540e097336969a9fe4d667190 100644 (file)
@@ -6,25 +6,19 @@ from . import misc
 from .decorators import *
 
 class Sessions(base.Object):
-       def _get_session(self, query, *args):
-               res = self.db.get(query, *args)
+       async def _get_sessions(self, *args, **kwargs):
+               return self.db.fetch_many(Session, *args, **kwargs)
 
-               if res:
-                       return Session(self.backend, res.id, data=res)
+       async def _get_session(self, *args, **kwargs):
+               return self.db.fetch_one(Session, *args, **kwargs)
 
-       def _get_sessions(self, query, *args):
-               res = self.db.query(query, *args)
-
-               for row in res:
-                       yield Session(self.backend, row.id, data=row)
-
-       def __iter__(self):
-               sessions = self._get_sessions("SELECT * FROM sessions \
+       async def __aiter__(self):
+               sessions = await self._get_sessions("SELECT * FROM sessions \
                        WHERE valid_until >= NOW() ORDER BY valid_until DESC")
 
-               return iter(sessions)
+               return aiter(sessions)
 
-       def create(self, user, address, user_agent=None):
+       async def create(self, user, address, user_agent=None):
                """
                        Creates a new session in the data.
 
@@ -33,11 +27,26 @@ class Sessions(base.Object):
                """
                session_id = misc.generate_random_string(48)
 
-               return self._get_session("INSERT INTO sessions(session_id, user_id, address, user_agent) \
-                       VALUES(%s, %s, %s, %s) RETURNING *", session_id, user.id, address, user_agent)
-
-       def get_by_session_id(self, session_id):
-               return self._get_session("SELECT * FROM sessions \
+               return await self._get_session("""
+                       INSERT INTO
+                               sessions
+                       (
+                               session_id,
+                               user_id,
+                               address,
+                               user_agent
+                       )
+                       VALUES
+                       (
+                               %s, %s, %s, %s
+                       )
+                       RETURNING
+                               *
+                       """, session_id, user.id, address, user_agent,
+               )
+
+       async def get_by_session_id(self, session_id):
+               return await self._get_session("SELECT * FROM sessions \
                        WHERE session_id = %s AND valid_until >= NOW()", session_id)
 
        # Alias function
@@ -47,8 +56,8 @@ class Sessions(base.Object):
                """
                        Deletes all sessions that are not valid any more
                """
-               with self.db.transaction():
-                       self.db.execute("DELETE FROM sessions WHERE valid_until < CURRENT_TIMESTAMP")
+               async with self.db.transaction():
+                       await self.db.execute("DELETE FROM sessions WHERE valid_until < CURRENT_TIMESTAMP")
 
 
 class Session(base.DataObject):
@@ -60,8 +69,8 @@ class Session(base.DataObject):
 
                return NotImplemented
 
-       def destroy(self):
-               self.db.execute("DELETE FROM sessions WHERE id = %s", self.id)
+       async def destroy(self):
+               await self.db.execute("DELETE FROM sessions WHERE id = %s", self.id)
 
        @property
        def session_id(self):
index e71fde1be8054fefc6f669149bbb6940e30434f2..2f6b457834f9f614b53d3a412433d221285b297b 100644 (file)
@@ -3,31 +3,31 @@
 from . import base
 
 class Settings(base.Object):
-       def get(self, key, default=None):
-               res = self.db.get("SELECT v FROM settings WHERE k = %s", key)
+       async def get(self, key, default=None):
+               res = await self.db.get("SELECT v FROM settings WHERE k = %s", key)
                if res:
                        return res.v
 
                return default
 
-       def get_int(self, key, default=None):
-               value = self.get(key, default)
+       async def get_int(self, key, default=None):
+               value = await self.get(key, default)
 
                try:
                        return int(value)
                except ValueError:
                        return None
 
-       def get_float(self, key, default=None):
-               value = self.get(key, default)
+       async def get_float(self, key, default=None):
+               value = await self.get(key, default)
 
                try:
                        return float(value)
                except ValueError:
                        return None
 
-       def set(self, key, value):
-               self.db.execute("""
+       async def set(self, key, value):
+               await self.db.execute("""
                        INSERT INTO
                                settings(
                                        k,
@@ -44,3 +44,4 @@ class Settings(base.Object):
                                settings.k = excluded.k
                        """, key, value,
                )
+
index 6dbec0e7dc571bffc884418a1f149c57f0f1adb8..e1a48e3287feb995d0de1a5af80d5dc8352d8313 100644 (file)
@@ -33,7 +33,13 @@ VALID_TAGS = (
 )
 
 class Sources(base.Object):
-       def __iter__(self):
+       def _get_sources(self, query, *args, **kwargs):
+               return self.db.fetch_many(Source, query, *args, **kwargs)
+
+       async def _get_source(self, query, *args, **kwargs):
+               return await self.db.fetch_one(Source, query, *args, **kwargs)
+
+       def __aiter__(self):
                sources = self._get_sources("""
                        SELECT
                                *
@@ -46,16 +52,10 @@ class Sources(base.Object):
                        """,
                )
 
-               return iter(sources)
-
-       def _get_sources(self, query, *args, **kwargs):
-               return self.db.fetch_many(Source, query, *args, **kwargs)
-
-       def _get_source(self, query, *args, **kwargs):
-               return self.db.fetch_one(Source, query, *args, **kwargs)
+               return aiter(sources)
 
-       def get_by_id(self, id):
-               return self._get_source("""
+       async def get_by_id(self, id):
+               return await self._get_source("""
                        SELECT
                                *
                        FROM
@@ -65,8 +65,8 @@ class Sources(base.Object):
                        """, id,
                )
 
-       def get_by_slug(self, slug):
-               return self._get_source("""
+       async def get_by_slug(self, slug):
+               return await self._get_source("""
                        SELECT
                                *
                        FROM
@@ -78,12 +78,12 @@ class Sources(base.Object):
                        """, slug,
                )
 
-       def create(self, repo, name, url, user):
+       async def create(self, repo, name, url, user):
                # Make slug
                slug = self._make_slug(name)
 
                # Insert into the database
-               return self._get_source("""
+               source = await self._get_source("""
                        INSERT INTO
                                sources(
                                        name,
@@ -97,8 +97,13 @@ class Sources(base.Object):
                        RETURNING
                                *
                        """, name, url, user, repo,
+
+                       # Populate cache
+                       repo=repo,
                )
 
+               return source
+
        def _make_slug(self, name):
                for i in misc.infinity():
                        slug = misc.normalize(name, iteration=i)
@@ -115,11 +120,11 @@ class Sources(base.Object):
        def _get_commits(self, query, *args, **kwargs):
                return self.db.fetch_many(Commit, query, *args, **kwargs)
 
-       def _get_commit(self, query, *args, **kwargs):
-               return self.db.fetch_one(Commit, query, *args, **kwargs)
+       async def _get_commit(self, query, *args, **kwargs):
+               return await self.db.fetch_one(Commit, query, *args, **kwargs)
 
-       def get_commit_by_id(self, id):
-               return self._get_commit("""
+       async def get_commit_by_id(self, id):
+               return await self._get_commit("""
                        SELECT
                                *
                        FROM
@@ -149,8 +154,8 @@ class Sources(base.Object):
        def _get_jobs(self, query, *args, **kwargs):
                return self.db.fetch_many(Job, query, *args, **kwargs)
 
-       def _get_job(self, query, *args, **kwargs):
-               return self.db.fetch_one(Job, query, *args, **kwargs)
+       async def _get_job(self, query, *args, **kwargs):
+               return await self.db.fetch_one(Job, query, *args, **kwargs)
 
        @property
        def pending_jobs(self):
@@ -182,7 +187,7 @@ class Sources(base.Object):
                        Runs all unfinished jobs of all sources
                """
                # Run jobs one after the other
-               for job in self.pending_jobs:
+               async for job in self.pending_jobs:
                        await job.run()
 
 
@@ -616,14 +621,14 @@ class Commit(base.DataObject):
 
        # Jobs
 
-       def _create_job(self, action, name):
+       async def _create_job(self, action, name):
                """
                        Creates a new job
                """
                log.info("%s: %s: Created '%s' job for '%s'" \
                        % (self.source, self.revision, action, name))
 
-               job = self.backend.sources._get_job("""
+               job = await self.backend.sources._get_job("""
                        INSERT INTO
                                source_commit_jobs
                        (
index cfac0a342e157c446e1bd300fa173c68d9a6b115..2e0039e7b5ba998a3bc72fb062b07f60dbc2d766 100644 (file)
@@ -26,26 +26,20 @@ class UnsupportedDigestException(ValueError):
        pass
 
 class Uploads(base.Object):
-       def _get_upload(self, query, *args):
-               res = self.db.get(query, *args)
+       async def _get_uploads(self, *args, **kwargs):
+               return await self.db.fetch_many(Upload, *args, **kwargs)
 
-               if res:
-                       return Upload(self.backend, res.id, data=res)
+       async def _get_upload(self, *args, **kwargs):
+               return await self.db.fetch_one(Upload, *args, **kwargs)
 
-       def _get_uploads(self, query, *args):
-               res = self.db.query(query, *args)
-
-               for row in res:
-                       yield Upload(self.backend, row.id, data=row)
-
-       def __iter__(self):
-               uploads = self._get_uploads("SELECT * FROM uploads \
+       async def __aiter__(self):
+               uploads = await self._get_uploads("SELECT * FROM uploads \
                        ORDER BY created_at DESC")
 
-               return iter(uploads)
+               return aiter(uploads)
 
-       def get_by_uuid(self, uuid):
-               return self._get_upload("""
+       async def get_by_uuid(self, uuid):
+               return await self._get_upload("""
                        SELECT
                                *
                        FROM
@@ -81,7 +75,7 @@ class Uploads(base.Object):
                        user.check_storage_quota(size)
 
                # Allocate a new temporary file
-               upload = self._get_upload("""
+               upload = await self._get_upload("""
                        INSERT INTO
                                uploads
                        (
@@ -135,7 +129,7 @@ class Uploads(base.Object):
 
        async def cleanup(self):
                # Find all expired uploads
-               uploads = self._get_uploads("""
+               uploads = await self._get_uploads("""
                        SELECT
                                *
                        FROM
@@ -222,7 +216,7 @@ class Upload(base.DataObject):
                        await self.backend.unlink(self.path)
 
                # Delete the upload from the database
-               self.db.execute("DELETE FROM uploads WHERE id = %s", self.id)
+               await self.db.execute("DELETE FROM uploads WHERE id = %s", self.id)
 
        @property
        def created_at(self):
index 7ad7417942706a0c7c3ee3e7dee3d79d21fe32f1..75773067c6c61170e2061ad007134102898c95b9 100644 (file)
@@ -117,20 +117,14 @@ class Users(base.Object):
 
                return self.local.ldap
 
-       def _get_user(self, query, *args):
-               res = self.db.get(query, *args)
-
-               if res:
-                       return User(self.backend, res.id, data=res)
-
-       def _get_users(self, query, *args):
-               res = self.db.query(query, *args)
+       def _get_users(self, *args, **kwargs):
+               return self.db.fetch_many(User, *args, **kwargs)
 
-               for row in res:
-                       yield User(self.backend, row.id, data=row)
+       def _get_user(self, *args, **kwargs):
+               return self.db.fetch_one(User, *args, **kwargs)
 
-       def __iter__(self):
-               users = self._get_users("""
+       async def __aiter__(self):
+               users = await self._get_users("""
                        SELECT
                                *
                        FROM
@@ -142,20 +136,7 @@ class Users(base.Object):
                        """,
                )
 
-               return iter(users)
-
-       def __len__(self):
-               res = self.db.get("""
-                       SELECT
-                               COUNT(*) AS count
-                       FROM
-                               users
-                       WHERE
-                               deleted_at IS NULL
-                       """,
-               )
-
-               return res.count
+               return aiter(users)
 
        def _ldap_query(self, query, attrlist=None, limit=0, search_base=None):
                search_base = self.backend.config.get("ldap", "base")
@@ -216,7 +197,7 @@ class Users(base.Object):
 
                return results[0]
 
-       def create(self, name, notify=False, storage_quota=None, _attrs=None):
+       async def create(self, name, notify=False, storage_quota=None, _attrs=None):
                """
                        Creates a new user
                """
@@ -228,7 +209,8 @@ class Users(base.Object):
                if storage_quota is None:
                        storage_quota = DEFAULT_STORAGE_QUOTA
 
-               user = self._get_user("""
+               # Insert into database
+               user = await self._get_user("""
                        INSERT INTO
                                users
                        (
@@ -249,20 +231,19 @@ class Users(base.Object):
 
                # Send a welcome email
                if notify:
-                       user._send_welcome_email()
+                       await user._send_welcome_email()
 
                return user
 
-       def get_by_id(self, id):
-               return self._get_user("SELECT * FROM users \
-                       WHERE id = %s", id)
+       async def get_by_id(self, id):
+               return await self._get_user("SELECT * FROM users WHERE id = %s", id)
 
-       def get_by_name(self, name):
+       async def get_by_name(self, name):
                """
                        Fetch a user by its username
                """
                # Try to find a local user
-               user = self._get_user("""
+               user = await self._get_user("""
                        SELECT
                                *
                        FROM
@@ -298,7 +279,7 @@ class Users(base.Object):
                # Create a new user
                return self.create(uid)
 
-       def get_by_email(self, mail):
+       async def get_by_email(self, mail):
                # Strip any excess stuff from the email address
                name, mail = email.utils.parseaddr(mail)
 
@@ -330,16 +311,16 @@ class Users(base.Object):
                # Fetch the UID
                uid = res.get("uid")[0].decode()
 
-               return self.get_by_name(uid)
+               return await self.get_by_name(uid)
 
-       def _search_by_email(self, mails, include_missing=True):
+       async def _search_by_email(self, mails, include_missing=True):
                """
                        Takes a list of email addresses and returns all users that could be found
                """
                users = []
 
                for mail in mails:
-                       user = self.get_by_email(mail)
+                       user = await self.get_by_email(mail)
 
                        # Include the search string if no user could be found
                        if not user and include_missing:
@@ -353,7 +334,7 @@ class Users(base.Object):
 
                return users
 
-       def search(self, q, limit=None):
+       async def search(self, q, limit=None):
                # Do nothing in test mode
                if self.backend.test:
                        log.warning("Cannot search for users in test mode")
@@ -374,7 +355,7 @@ class Users(base.Object):
                )
 
                # Fetch users
-               users = self._get_users("""
+               users = await self._get_users("""
                        SELECT
                                *
                        FROM
@@ -402,11 +383,11 @@ class Users(base.Object):
                return user
 
        @property
-       def top(self):
+       async def top(self):
                """
                        Returns the top users (with the most builds in the last year)
                """
-               users = self._get_users("""
+               users = await self._get_users("""
                        SELECT
                                DISTINCT users.*,
                                COUNT(builds.id) AS _sort
@@ -532,8 +513,8 @@ class User(base.DataObject):
        def name(self):
                return self.data.name
 
-       def delete(self):
-               self._set_attribute("deleted", True)
+       async def delete(self):
+               await self._set_attribute("deleted", True)
 
                # Destroy all sessions
                for session in self.sessions:
@@ -579,19 +560,19 @@ class User(base.DataObject):
                        self.email or "invalid@invalid.tld",
                ))
 
-       def send_email(self, *args, **kwargs):
-               return self.backend.messages.send_template(
+       async def send_email(self, *args, **kwargs):
+               return await self.backend.messages.send_template(
                        *args,
                        recipient=self,
                        locale=self.locale,
                        **kwargs,
                )
 
-       def _send_welcome_email(self):
+       async def _send_welcome_email(self):
                """
                        Sends a welcome email to the user
                """
-               self.send_email("users/messages/welcome.txt")
+               await self.send_email("users/messages/welcome.txt")
 
        def is_admin(self):
                return self.data.admin is True
@@ -644,24 +625,6 @@ class User(base.DataObject):
                # No permission
                return False
 
-       @property
-       def sessions(self):
-               sessions = self.backend.sessions._get_sessions("""
-                       SELECT
-                               *
-                       FROM
-                               sessions
-                       WHERE
-                               user_id = %s
-                       AND
-                               valid_until >= NOW()
-                       ORDER BY
-                               created_at
-                       """, self.id,
-               )
-
-               return list(sessions)
-
        # Bugzilla
 
        async def connect_to_bugzilla(self, api_key):
@@ -854,14 +817,14 @@ class User(base.DataObject):
 
        # Builds
 
-       def get_builds(self, name=None, limit=None, offset=None):
+       async def get_builds(self, name=None, limit=None, offset=None):
                """
                        Returns builds by a certain user
                """
                if name:
-                       return self.get_builds_by_name(name, limit=limit, offset=offset)
+                       return await self.get_builds_by_name(name, limit=limit, offset=offset)
 
-               builds = self.backend.builds._get_builds("""
+               builds = await self.backend.builds._get_builds("""
                        SELECT
                                *
                        FROM
@@ -883,11 +846,11 @@ class User(base.DataObject):
 
                return list(builds)
 
-       def get_builds_by_name(self, name, limit=None, offset=None):
+       async def get_builds_by_name(self, name, limit=None, offset=None):
                """
                        Fetches all builds matching name
                """
-               builds = self.backend.builds._get_builds("""
+               builds = await self.backend.builds._get_builds("""
                        SELECT
                                builds.*
                        FROM
@@ -1154,11 +1117,11 @@ class UserPushSubscription(base.DataObject):
        def deleted_at(self):
                return self.data.deleted_at
 
-       def delete(self):
+       async def delete(self):
                """
                        Deletes this subscription
                """
-               self._set_attribute_now("deleted_at")
+               await self._set_attribute_now("deleted_at")
 
        @property
        def endpoint(self):
index 871fd5a364678dd390817142eb5d3f0b4c26606f..5a8bfff05a104a7be77fd66215ce9d91cbb89dd2 100644 (file)
@@ -1,4 +1,4 @@
 MAILTO=pakfire@ipfire.org
 
 # Send queued emails once a minute
-* * * * *              _pakfire        pakfire-build-service --logging=warning messages:queue:send
+#* * * * *             _pakfire        pakfire-build-service --logging=warning messages:queue:send
index 84f6ddeecb6b2d708514044ed3afa98cc26b17b4..7566cd6e360ee7cc019caa2b5dff65e914ff7ebf 100644 (file)
@@ -68,8 +68,8 @@
                <div class="container">
                        <h4 class="title is-4">{{ _("Latest Release") }}</h4>
 
-                       {% if distro.latest_release %}
-                               {% module ReleasesList([distro.latest_release]) %}
+                       {% if latest_release %}
+                               {% module ReleasesList([latest_release]) %}
                        {% else %}
                                <p class="notification">
                                        {{ _("No release, yet") }}
 
        {# Repositories #}
 
-       {% if distro.repos %}
+       {% if repos %}
                <section class="section">
                        <div class="container">
                                <h4 class="title is-4">{{ _("Repositories") }}</h4>
 
-                               {% module ReposList(distro.repos) %}
+                               {% module ReposList(repos) %}
                        </div>
                </section>
        {% end %}
 
        {# Sources #}
 
-       {% if distro.sources %}
+       {% if sources %}
                <section class="section">
                        <div class="container">
                                <h4 class="title is-4">{{ _("Sources") }}</h4>
 
-                               {% module SourcesList(distro.sources) %}
+                               {% module SourcesList(sources) %}
                        </div>
                </section>
        {% end %}
index 556eb91faa6d426dfcdee171be587e0f5826695c..a8c70082ba7fa9a9689094ce1a8b09b7ec5fc0ce 100644 (file)
@@ -1,6 +1,6 @@
 <div class="block">
        <nav class="panel is-link">
-               {% for repo in repos %}
+               {% for repo in sorted(repos) %}
                        <a class="panel-block is-justify-content-space-between p-4" href="{{ repo.url }}">
                                <h5 class="title is-5 mb-0">
                                        {% if repo.owner %}
                                        <span class="has-text-grey">
                                                {{ _("Added %s") % locale.format_date(t, shorter=True) }}
                                        </span>
-                               {% else %}
-                                       <span class="tag">
-                                               {{ _("One Build", "%(num)s Builds", repo.total_builds) % { "num" : repo.total_builds } }}
-                                       </span>
                                {% end %}
                        </a>
                {% end %}
index 33d22b2b6e5f79537a097518642c5998a8eb99ea..b9b0d248f44c3974ffff9d658fa63596bf528f03 100644 (file)
@@ -16,7 +16,7 @@ class LoginHandler(base.KerberosAuthMixin, base.BaseHandler):
                self.render("login.html", username=username, failed=failed)
 
        @base.ratelimit(requests=10, minutes=5)
-       def post(self):
+       async def post(self):
                # Fetch credentials
                username = self.get_argument("username")
                password = self.get_argument("password")
@@ -26,14 +26,14 @@ class LoginHandler(base.KerberosAuthMixin, base.BaseHandler):
                        return self.get(username=username, failed=True)
 
                # If the authentication was successful, we create a new session
-               with self.db.transaction():
+               async with self.db.transaction():
                        # Fetch the authenticated user
-                       user = self.backend.users.get_by_name(username)
+                       user = await self.backend.users.get_by_name(username)
                        if not user:
                                raise tornado.web.HTTPError(500, "Could not find user %s" % username)
 
                        # Create a new session
-                       session = self.backend.sessions.create(user,
+                       session = await self.backend.sessions.create(user,
                                self.current_address, user_agent=self.user_agent)
 
                # Send the session cookie to the browser
@@ -47,12 +47,12 @@ class LoginHandler(base.KerberosAuthMixin, base.BaseHandler):
 
 
 class LogoutHandler(base.BaseHandler):
-       @tornado.web.authenticated
-       def get(self):
+       @base.authenticated
+       async def get(self):
                # Destroy the user's session.
-               with self.db.transaction():
+               async with self.db.transaction():
                        # Destroy the session
-                       self.session.destroy()
+                       await self.session.destroy()
 
                # Remove the session cookie
                self.clear_cookie("session_id")
index 91e1855ee1234a149cf529d9ba66c89202adbb85..7b8d65f0c415910f78274f859a33206efa44d924 100644 (file)
@@ -14,6 +14,7 @@ import tornado.locale
 import tornado.web
 import tornado.websocket
 import traceback
+import urllib.parse
 
 from .. import __version__
 from .. import builders
@@ -167,18 +168,19 @@ class BaseHandler(tornado.web.RequestHandler):
        def db(self):
                return self.backend.db
 
-       @lazy_property
-       def session(self):
+       async def _get_current_user(self):
                # Get the session from the cookie
                session_id = self.get_cookie("session_id", None)
+               if not session_id:
+                       return
 
-               # Search for a valid database session
-               if session_id:
-                       return self.backend.sessions.get(session_id)
+               # Fetch the session
+               session = await self.backend.sessions.get(session_id)
+               if not session:
+                       return
 
-       def get_current_user(self):
-               if self.session:
-                       return self.session.user
+               # Return the user
+               return await session.get_user()
 
        def get_user_locale(self):
                # Get the locale from the user settings
@@ -429,6 +431,43 @@ class APIMixin(KerberosAuthMixin):
                return message
 
 
+def authenticated(method):
+       """
+               This is our custom authentication wrapper which supports an
+               asynchronous implementation of "get_current_user()".
+       """
+       @functools.wraps(method)
+       async def wrapper(self, *args, **kwargs):
+               self.current_user = await self._get_current_user()
+
+               if not self.current_user:
+                       if self.request.method in ("GET", "HEAD"):
+                               url = self.get_login_url()
+                               if "?" not in url:
+                                       if urllib.parse.urlsplit(url).scheme:
+                                               # if login url is absolute, make next absolute too
+                                               next_url = self.request.full_url()
+                                       else:
+                                               assert self.request.uri is not None
+                                               next_url = self.request.uri
+                                       url += "?" + urllib.parse.urlencode(dict(next=next_url))
+                               self.redirect(url)
+                               return None
+
+                       # Authentication has failed
+                       raise tornado.web.HTTPError(403)
+
+               # Call the wrapped method
+               result = method(self, *args, **kwargs)
+
+               # Support coroutines
+               if asyncio.iscoroutine(result):
+                       result = await result
+
+               return result
+
+       return wrapper
+
 def negotiate(method):
        """
                Requires clients to use SPNEGO
index 244c440b238783140af5c72e0f972b432822873b..b10e4e368f9cbe3ce3e74b30c0cfee60eeef5ea9 100644 (file)
@@ -50,7 +50,7 @@ class APIv1ControlHandler(base.APIMixin, base.BackendMixin, tornado.websocket.We
                """
                        Handles stats messages
                """
-               with self.db.transaction():
+               async with await self.db.transaction():
                        await self.builder.log_stats(**data)
 
 
@@ -110,7 +110,7 @@ class CreateHandler(base.BaseHandler):
                self.render("builders/create.html")
 
        @tornado.web.authenticated
-       def post(self):
+       async def post(self):
                # Must be admin
                if not self.current_user.is_admin():
                        raise tornado.web.HTTPError(403)
@@ -118,7 +118,7 @@ class CreateHandler(base.BaseHandler):
                hostname = self.get_argument("hostname")
 
                # Create a new builder
-               with self.db.transaction():
+               async with await self.db.transaction():
                        builder = self.backend.builders.create(hostname, user=self.current_user)
 
                self.redirect("/builders/%s/edit" % builder.hostname)
@@ -126,8 +126,8 @@ class CreateHandler(base.BaseHandler):
 
 class BuilderEditHandler(base.BaseHandler):
        @tornado.web.authenticated
-       def get(self, hostname):
-               builder = self.backend.builders.get_by_name(hostname)
+       async def get(self, hostname):
+               builder = await self.backend.builders.get_by_name(hostname)
                if not builder:
                        raise tornado.web.HTTPError(404, "Builder not found")
 
@@ -147,7 +147,7 @@ class BuilderEditHandler(base.BaseHandler):
                if not builder.has_perm(self.current_user):
                        raise tornado.web.HTTPError(403)
 
-               with self.db.transaction():
+               async with await self.db.transaction():
                        builder.enabled     = self.get_argument("enabled", False)
                        builder.maintenance = self.get_argument_bool("maintenance")
                        builder.max_jobs    = self.get_argument_int("max_jobs")
@@ -182,15 +182,15 @@ class DeleteHandler(base.BaseHandler):
                        raise tornado.web.HTTPError(403)
 
                # Delete the builder
-               with self.db.transaction():
-                       builder.delete(user=self.current_user)
+               async with await self.db.transaction():
+                       await builder.delete(user=self.current_user)
 
                self.redirect("/builders")
 
 
 class StartHandler(base.BaseHandler):
        @tornado.web.authenticated
-       def get(self, name):
+       async def get(self, name):
                builder = self.backend.builders.get_by_name(name)
                if not builder:
                        raise tornado.web.HTTPError(404, "Builder not found: %s" % name)
index be8bef0d173d70aab70cb45da3ccfe25a3398f7c..2860d82bb0b500cbfa1c450dc3559efb01e4a6ad 100644 (file)
@@ -70,7 +70,7 @@ class APIv1IndexHandler(base.APIMixin, base.BaseHandler):
 
 
 class IndexHandler(base.BaseHandler):
-       def get(self):
+       async def get(self):
                # Pagination
                offset = self.get_argument_int("offset", None) or 0
                limit  = self.get_argument_int("limit", None) or 25
@@ -86,7 +86,7 @@ class IndexHandler(base.BaseHandler):
                        builds = self.backend.builds.get_recent(name=name, limit=limit, offset=offset)
 
                # Group builds by date
-               builds = misc.group(builds, lambda build: build.created_at.date())
+               builds = await misc.group(builds, lambda build: build.created_at.date())
 
                self.render("builds/index.html", builds=builds, name=name, user=user,
                        limit=limit, offset=offset)
index 5391360dab52117797a40af1e8bc5885867a42b8..0dd2fd6d6a8ff1bab60dab76c5500f2c8078455b 100644 (file)
@@ -1,28 +1,42 @@
 #!/usr/bin/python
 
+import asyncio
 import tornado.web
 
 from . import base
 from . import ui_modules
 
 class IndexHandler(base.BaseHandler):
-       def get(self):
-               self.render("distros/index.html", distros=self.backend.distros)
+       async def get(self):
+               # Fetch all distributions
+               distros = [distro async for distro in self.backend.distros]
+
+               self.render("distros/index.html", distros=distros)
 
 
 class ShowHandler(base.BaseHandler):
-       def get(self, slug):
-               distro = self.backend.distros.get_by_slug(slug)
+       async def get(self, slug):
+               distro = await self.backend.distros.get_by_slug(slug)
                if not distro:
                        raise tornado.web.HTTPError(404, "Could not find distro: %s" % slug)
 
-               self.render("distros/show.html", distro=distro)
+               # Fetch the latest release
+               latest_release = await distro.get_latest_release()
+
+               # Fetch all repos
+               repos = [repo async for repo in distro.get_repos()]
+
+               # Fetch all sources
+               sources = [source async for source in distro.get_sources()]
+
+               self.render("distros/show.html", distro=distro,
+                       latest_release=latest_release, repos=repos, sources=sources)
 
 
 class EditHandler(base.BaseHandler):
        @tornado.web.authenticated
        async def get(self, slug):
-               distro = self.backend.distros.get_by_slug(slug)
+               distro = await self.backend.distros.get_by_slug(slug)
                if not distro:
                        raise tornado.web.HTTPError(404, "Could not find distribution: %s" % slug)
 
@@ -36,8 +50,8 @@ class EditHandler(base.BaseHandler):
                self.render("distros/edit.html", distro=distro, bugzilla_products=bugzilla_products)
 
        @tornado.web.authenticated
-       def post(self, slug):
-               distro = self.backend.distros.get_by_slug(slug)
+       async def post(self, slug):
+               distro = await self.backend.distros.get_by_slug(slug)
                if not distro:
                        raise tornado.web.HTTPError(404, "Could not find distribution: %s" % slug)
 
@@ -56,8 +70,8 @@ class EditHandler(base.BaseHandler):
 
 
 class ReleasesIndexHandler(base.BaseHandler):
-       def get(self, distro_slug):
-               distro = self.backend.distros.get_by_slug(distro_slug)
+       async def get(self, distro_slug):
+               distro = await self.backend.distros.get_by_slug(distro_slug)
                if not distro:
                        raise tornado.web.HTTPError(404, "Could not find distro: %s" % distro_slug)
 
@@ -72,13 +86,13 @@ class ReleasesIndexHandler(base.BaseHandler):
 
 
 class ReleasesShowHandler(base.BaseHandler):
-       def get(self, distro_slug, release_slug):
-               distro = self.backend.distros.get_by_slug(distro_slug)
+       async def get(self, distro_slug, release_slug):
+               distro = await self.backend.distros.get_by_slug(distro_slug)
                if not distro:
                        raise tornado.web.HTTPError(404, "Could not find distro: %s" % distro_slug)
 
                # Fetch the release
-               release = distro.get_release(release_slug)
+               release = await distro.get_release(release_slug)
                if not release:
                        raise tornado.web.HTTPError(404, "Could not find release %s" % release_slug)
 
@@ -89,8 +103,8 @@ class ReleasesShowHandler(base.BaseHandler):
 
 class ReleasesCreateHandler(base.BaseHandler):
        @tornado.web.authenticated
-       def get(self, distro_slug):
-               distro = self.backend.distros.get_by_slug(distro_slug)
+       async def get(self, distro_slug):
+               distro = await self.backend.distros.get_by_slug(distro_slug)
                if not distro:
                        raise tornado.web.HTTPError(404, "Could not find distro: %s" % distro_slug)
 
@@ -101,8 +115,8 @@ class ReleasesCreateHandler(base.BaseHandler):
                self.render("distros/releases/edit.html", release=None, distro=distro)
 
        @tornado.web.authenticated
-       def post(self, distro_slug):
-               distro = self.backend.distros.get_by_slug(distro_slug)
+       async def post(self, distro_slug):
+               distro = await self.backend.distros.get_by_slug(distro_slug)
                if not distro:
                        raise tornado.web.HTTPError(404, "Could not find distro: %s" % distro_slug)
 
index 4074c5a0b9c254318d350daf1b19f75af0382889..9d2987fa17f2996f4ada9dc7f7b37aa948034ec8 100644 (file)
@@ -5,19 +5,19 @@ import tornado.web
 from . import base
 
 class IndexHandler(base.BaseHandler):
-       def get(self):
-               args = {
+       async def get(self):
+               async with await self.db.transaction():
                        # Fetch all running jobs
-                       "running_jobs" : self.backend.jobs.running,
+                       running_jobs  = [job async for job in self.backend.jobs.get_running()]
 
                        # Fetch finished jobs
-                       "finished_jobs" : self.backend.jobs.get_finished(limit=8),
+                       finished_jobs = [job async for job in self.backend.jobs.get_finished(limit=8)]
 
                        # Fetch the total length of the queue
-                       "queue_length" : len(self.backend.jobs.queue),
-               }
+                       queue_length  = await self.backend.jobs.queue.get_length()
 
-               self.render("index.html", **args)
+               self.render("index.html", running_jobs=running_jobs,
+                       finished_jobs=finished_jobs, queue_length=queue_length)
 
 
 class LogHandler(base.BaseHandler):
index f5c8761e979d6e84ffafe00b64a0e9093d443b45..0412151c721998d8050c28455d245132292570d3 100644 (file)
@@ -20,9 +20,9 @@ class APIv1ControlHandler(base.APIMixin, base.BackendMixin, tornado.websocket.We
        # Don't allow users to authenticate
        allow_users = False
 
-       @tornado.web.authenticated
-       def open(self, job_id):
-               self.job = self.backend.jobs.get_by_uuid(job_id)
+       @base.negotiate
+       async def open(self, job_id):
+               self.job = await self.backend.jobs.get_by_uuid(job_id)
                if not self.job:
                        raise tornado.web.HTTPError(404, "Could not find job %s" % job_id)
 
@@ -69,7 +69,7 @@ class APIv1ControlHandler(base.APIMixin, base.BackendMixin, tornado.websocket.We
 class APIv1IndexHandler(base.APIMixin, tornado.web.RequestHandler):
        @base.negotiate
        async def post(self, uuid):
-               job = self.backend.jobs.get_by_uuid(uuid)
+               job = await self.backend.jobs.get_by_uuid(uuid)
                if not job:
                        raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
 
@@ -105,7 +105,7 @@ class APIv1IndexHandler(base.APIMixin, tornado.web.RequestHandler):
 class APIv1LogStreamHandler(base.BackendMixin, tornado.websocket.WebSocketHandler):
        # No authentication required
        async def open(self, uuid):
-               job = self.backend.jobs.get_by_uuid(uuid)
+               job = await self.backend.jobs.get_by_uuid(uuid)
                if not job:
                        raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
 
@@ -139,7 +139,7 @@ class APIv1LogStreamHandler(base.BackendMixin, tornado.websocket.WebSocketHandle
 
 
 class IndexHandler(base.BaseHandler):
-       def get(self):
+       async def get(self):
                # Pagination
                offset = self.get_argument_int("offset", None) or 0
                limit  = self.get_argument_int("limit", None) or 50
@@ -152,7 +152,7 @@ class IndexHandler(base.BaseHandler):
                                limit=limit, offset=offset)
 
                        # Group jobs by date
-                       jobs = misc.group(jobs, lambda job: job.finished_at.date())
+                       jobs = await misc.group(jobs, lambda job: job.finished_at.date())
 
                self.render("jobs/index.html", jobs=jobs, limit=limit, offset=offset,
                        failed_only=failed_only)
@@ -165,7 +165,7 @@ class QueueHandler(base.BaseHandler):
 
 class LogHandler(base.BaseHandler):
        async def get(self, uuid):
-               job = self.backend.jobs.get_by_uuid(uuid)
+               job = await self.backend.jobs.get_by_uuid(uuid)
                if not job:
                        raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
 
@@ -194,8 +194,8 @@ class LogHandler(base.BaseHandler):
 
 class AbortHandler(base.BaseHandler):
        @tornado.web.authenticated
-       def get(self, uuid):
-               job = self.backend.jobs.get_by_uuid(uuid)
+       async def get(self, uuid):
+               job = await self.backend.jobs.get_by_uuid(uuid)
                if not job:
                        raise tornado.web.HTTPError(404, "Job not found: %s" % uuid)
 
@@ -207,7 +207,7 @@ class AbortHandler(base.BaseHandler):
 
        @tornado.web.authenticated
        async def post(self, uuid):
-               job = self.backend.jobs.get_by_uuid(uuid)
+               job = await self.backend.jobs.get_by_uuid(uuid)
                if not job:
                        raise tornado.web.HTTPError(404, "Job not found: %s" % uuid)
 
@@ -215,7 +215,7 @@ class AbortHandler(base.BaseHandler):
                if not job.has_perm(self.current_user):
                        raise tornado.web.HTTPError(403)
 
-               with self.db.transaction():
+               async with await self.db.transaction():
                        await job.abort(self.current_user)
 
                self.redirect("/builds/%s" % job.build.uuid)
@@ -223,8 +223,8 @@ class AbortHandler(base.BaseHandler):
 
 class RetryHandler(base.BaseHandler):
        @tornado.web.authenticated
-       def get(self, uuid):
-               job = self.backend.jobs.get_by_uuid(uuid)
+       async def get(self, uuid):
+               job = await self.backend.jobs.get_by_uuid(uuid)
                if not job:
                        raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
 
@@ -236,11 +236,11 @@ class RetryHandler(base.BaseHandler):
 
        @tornado.web.authenticated
        async def post(self, uuid):
-               job = self.backend.jobs.get_by_uuid(uuid)
+               job = await self.backend.jobs.get_by_uuid(uuid)
                if not job:
                        raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
 
-               with self.db.transaction():
+               async with await self.db.transaction():
                        job = await job.retry(self.current_user)
 
                        # Launch the newly created job
index 2ab2c523e6561fec6ea66eea0836531764f034fe..6918b4f22b7996923460da41f882557e4e70bc51 100644 (file)
@@ -10,11 +10,12 @@ from . import base
 from . import ui_modules
 
 class IndexHandler(base.BaseHandler):
-       def get(self):
+       async def get(self):
+               packages = self.backend.packages.get_list()
+
                # Sort all packages in an array like "<first char>" --> [packages, ...]
                # to print them in a table for each letter of the alphabet.
-               packages = misc.group(self.backend.packages.get_list(),
-                       lambda pkg: pkg.name[0].lower())
+               packages = await misc.group(packages, lambda pkg: pkg.name[0].lower())
 
                self.render("packages/index.html", packages=packages)
 
@@ -26,17 +27,20 @@ class NameHandler(base.BaseHandler):
                        raise tornado.web.HTTPError(404, "Package '%s' was not found" % name)
 
                # Fetch all distributions
-               distros = self.backend.distros
+               distros = {}
 
                # Get the latest bugs from Bugzilla
-               bugs = dict(zip(distros, await asyncio.gather(
-                       *(self.backend.bugzilla.search(component=name, **distro.bugzilla_fields)
-                               for distro in distros),
-               )))
+               async with asyncio.TaskGroup() as tasks:
+                       async for distro in self.backend.distros:
+                               distros[distro] = tasks.create_task(
+                                       self.backend.bugzilla.search(component=name, **distro.bugzilla_fields),
+                               )
+
+               bugs = { distro : await distros[distro] for distro in distros }
 
                # Fetch my own builds
                if self.current_user:
-                       scratch_builds = misc.group(
+                       scratch_builds = await misc.group(
                                self.current_user.get_builds_by_name(name), lambda build: build.distro,
                        )
                else:
@@ -47,7 +51,7 @@ class NameHandler(base.BaseHandler):
 
 
 class NameBuildsHandler(base.BaseHandler):
-       def get(self, name):
+       async def get(self, name):
                build = self.backend.builds.get_latest_by_name(name)
                if not build:
                        raise tornado.web.HTTPError(404, "Package '%s' was not found" % name)
@@ -59,21 +63,21 @@ class NameBuildsHandler(base.BaseHandler):
                release_builds = self.backend.builds.get_release_builds_by_name(name)
 
                # Group them by distribution
-               distros = misc.group(release_builds, lambda build: build.distro)
+               distros = await misc.group(release_builds, lambda build: build.distro)
 
                # Find the latest scratch builds
                scratch_builds = self.backend.builds.get_scratch_builds_by_name(name)
 
                # Group them by user
-               users = misc.group(scratch_builds, lambda build: build.owner)
+               users = await misc.group(scratch_builds, lambda build: build.owner)
 
                self.render("packages/name/builds.html", limit=limit,
                        package=build.pkg, distros=distros, users=users)
 
 
 class ShowHandler(base.BaseHandler):
-       def get(self, uuid):
-               package = self.backend.packages.get_by_uuid(uuid)
+       async def get(self, uuid):
+               package = await self.backend.packages.get_by_uuid(uuid)
                if not package:
                        raise tornado.web.HTTPError(404, "Could not find package: %s" % uuid)
 
@@ -82,12 +86,12 @@ class ShowHandler(base.BaseHandler):
 
 class FileDownloadHandler(base.BaseHandler):
        async def get(self, uuid, path):
-               package = self.backend.packages.get_by_uuid(uuid)
+               package = await self.backend.packages.get_by_uuid(uuid)
                if not package:
                        raise tornado.web.HTTPError(404, "Could not find package: %s" % uuid)
 
                # Fetch the file
-               file = package.get_file(path)
+               file = await package.get_file(path)
                if not file:
                        raise tornado.web.HTTPError(404, "Could not find file %s in %s" % (path, package))
 
@@ -114,12 +118,12 @@ class FileDownloadHandler(base.BaseHandler):
 
 class FileViewHandler(base.BaseHandler):
        async def get(self, uuid, path):
-               package = self.backend.packages.get_by_uuid(uuid)
+               package = await self.backend.packages.get_by_uuid(uuid)
                if not package:
                        raise tornado.web.HTTPError(404, "Could not find package: %s" % uuid)
 
                # Fetch the file
-               file = package.get_file(path)
+               file = await package.get_file(path)
                if not file:
                        raise tornado.web.HTTPError(404, "Could not find file %s in %s" % (path, package))