From: Michael Tremer Date: Sun, 12 Jan 2025 11:51:21 +0000 (+0000) Subject: Try to make this entire application async X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=515f990042d3171edacea0fcb782eb207550dc09;p=pbs.git Try to make this entire application async We will need this so that we won't run out of database file descriptors as psycopg3 currently can only use select() which does not support a file descriptor to have a number over 1024. We will also need this to scale this application better. However, Tornado has some limitations in its template engine which is why we will need to replace the template engine with Jinja. Signed-off-by: Michael Tremer --- diff --git a/src/buildservice/__init__.py b/src/buildservice/__init__.py index 197f5d10..93cdd1ba 100644 --- a/src/buildservice/__init__.py +++ b/src/buildservice/__init__.py @@ -112,7 +112,8 @@ class Backend(object): self.run_periodic_task(300, self.sources.fetch) # Regularly check for new releases - self.run_periodic_task(300, self.monitorings.check) + # XXX Disabled for now + #self.run_periodic_task(300, self.monitorings.check) # Cleanup regularly self.run_periodic_task(3600, self.cleanup) @@ -600,7 +601,7 @@ class Backend(object): ] # Add all mirrored repositories - for repo in self.repos.mirrored: + async for repo in await self.repos.mirrored: path = os.path.relpath(repo.local_path(), self.basepath) commandline.append("--include=%s***" % path) diff --git a/src/buildservice/base.py b/src/buildservice/base.py index c42478da..ffe8b716 100644 --- a/src/buildservice/base.py +++ b/src/buildservice/base.py @@ -48,43 +48,39 @@ class DataObject(Object): def __hash__(self): return hash(self.id) - def init(self, id, data=None, **kwargs): - self.id = id - - if data: - self.data = data + def init(self, data=None, **kwargs): + self.data = data # Set any extra arguments (to populate the cache) for arg in kwargs: setattr(self, arg, kwargs[arg]) - @lazy_property - def data(self): - assert self.table, "Table name is not set" - assert self.id + @property + def id(self): + return self.data.id - return self.db.get("SELECT * FROM %s \ - WHERE id = %%s" % self.table, self.id) + async def _set_attribute(self, key, val): + assert self.table, "Table name not set" + assert self.id - def _set_attribute(self, key, val): # Detect if an update is needed if self.data[key] == val: return - self.db.execute("UPDATE %s SET %s = %%s \ + await self.db.execute("UPDATE %s SET %s = %%s \ WHERE id = %%s" % (self.table, key), val, self.id) # Update the cached attribute self.data[key] = val - def _set_attribute_now(self, key): - """ - Sets the given key to CURRENT_TIMESTAMP - """ - res = self.db.get("UPDATE %s SET %s = CURRENT_TIMESTAMP \ + async def _set_attribute_now(self, key): + assert self.table, "Table name not set" + assert self.id + + res = await self.db.execute("UPDATE %s SET %s = CURRENT_TIMESTAMP \ WHERE id = %%s RETURNING %s" % (self.table, key, key), self.id) - # Update cached attribute + # Update the cached attribute if res: self.data[key] = res[key] diff --git a/src/buildservice/builders.py b/src/buildservice/builders.py index c51c3a04..b6049477 100644 --- a/src/buildservice/builders.py +++ b/src/buildservice/builders.py @@ -17,33 +17,27 @@ class Builders(base.Object): # Stores any control connections to builders connections = {} - def _get_builder(self, query, *args): - res = self.db.get(query, *args) + async def _get_builders(self, *args, **kwargs): + return await self.db.fetch_many(Builder, *args, **kwargs) - if res: - return Builder(self.backend, res.id, data=res) - - def _get_builders(self, query, *args): - res = self.db.query(query, *args) - - for row in res: - yield Builder(self.backend, row.id, data=row) + async def _get_builder(self, *args, **kwargs): + return await self.db.fetch_one(Builder, *args, **kwargs) - def __iter__(self): - builders = self._get_builders("SELECT * FROM builders \ + async def __aiter__(self): + builders = await self._get_builders("SELECT * FROM builders \ WHERE deleted_at IS NULL ORDER BY name") - return iter(builders) + return aiter(builders) def init(self): # Initialize stats self.stats = BuildersStats(self.backend) - def create(self, name, user=None): + async def create(self, name, user=None): """ Creates a new builder. """ - builder = self._get_builder(""" + builder = await self._get_builder(""" INSERT INTO builders ( @@ -61,8 +55,8 @@ class Builders(base.Object): return builder - def get_by_id(self, id): - return self._get_builder(""" + async def get_by_id(self, id): + return await self._get_builder(""" SELECT * FROM @@ -72,8 +66,8 @@ class Builders(base.Object): """, id, ) - def get_by_name(self, name): - return self._get_builder(""" + async def get_by_name(self, name): + return await self._get_builder(""" SELECT * FROM @@ -117,7 +111,7 @@ class Builders(base.Object): threshold = datetime.timedelta(minutes=5) # Fetch all builders - builders = [b for b in self] + builders = [b async for b in self] # Fetch the priority for each builder builders = dict( @@ -338,7 +332,7 @@ class Builder(base.DataObject): Logs some stats about this builder """ # Update information - self.db.execute(""" + await self.db.execute(""" UPDATE builders SET @@ -361,7 +355,7 @@ class Builder(base.DataObject): ) # Log Stats - stats = self._get_stats(""" + stats = await self._get_stats(""" INSERT INTO builder_stats ( diff --git a/src/buildservice/builds.py b/src/buildservice/builds.py index f7688cc8..964d151f 100644 --- a/src/buildservice/builds.py +++ b/src/buildservice/builds.py @@ -17,12 +17,12 @@ from .decorators import * log = logging.getLogger("pbs.builds") class Builds(base.Object): - def _get_build(self, query, *args, **kwargs): - return self.db.fetch_one(Build, query, *args, **kwargs) - def _get_builds(self, query, *args, **kwargs): return self.db.fetch_many(Build, query, *args, **kwargs) + async def _get_build(self, query, *args, **kwargs): + return await self.db.fetch_one(Build, query, *args, **kwargs) + def __len__(self): res = self.db.get(""" SELECT @@ -38,8 +38,8 @@ class Builds(base.Object): return res.builds - def get_by_id(self, id): - return self._get_build(""" + async def get_by_id(self, id): + return await self._get_build(""" SELECT * FROM @@ -49,8 +49,8 @@ class Builds(base.Object): """, id, ) - def get_by_uuid(self, uuid): - return self._get_build(""" + async def get_by_uuid(self, uuid): + return await self._get_build(""" SELECT * FROM @@ -89,7 +89,7 @@ class Builds(base.Object): """ Returns all builds by this name """ - builds = self._get_builds(""" + return self._get_builds(""" SELECT builds.* FROM @@ -107,10 +107,8 @@ class Builds(base.Object): """, name, ) - return list(builds) - def get_release_builds_by_name(self, name, limit=None): - builds = self._get_builds(""" + return self._get_builds(""" WITH builds AS ( SELECT builds.*, @@ -143,10 +141,8 @@ class Builds(base.Object): """, name, limit, ) - return list(builds) - def get_scratch_builds_by_name(self, name, limit=None): - builds = self._get_builds(""" + return self._get_builds(""" WITH builds AS ( SELECT builds.*, @@ -181,8 +177,6 @@ class Builds(base.Object): """, name, limit, ) - return list(builds) - def get_recent(self, name=None, limit=None, offset=None): """ Returns the most recent (non-test) builds @@ -190,7 +184,7 @@ class Builds(base.Object): if name: return self.get_recent_by_name(name, limit=limit, offset=offset) - builds = self._get_builds(""" + return self._get_builds(""" SELECT * FROM @@ -208,13 +202,11 @@ class Builds(base.Object): limit, offset, ) - return list(builds) - def get_recent_by_name(self, name, limit=None, offset=None): """ Returns the most recent (non-test) builds """ - builds = self._get_builds(""" + return self._get_builds(""" SELECT builds.* FROM @@ -238,13 +230,11 @@ class Builds(base.Object): name, limit, offset, ) - return list(builds) - def get_by_package_uuids(self, uuids): """ Returns a list of builds that contain the given packages """ - builds = self._get_builds(""" + return self._get_builds(""" SELECT DISTINCT builds.* FROM @@ -274,8 +264,6 @@ class Builds(base.Object): """, uuids, uuids, ) - return list(builds) - async def create(self, repo, package, owner=None, group=None, test=False, disable_test_builds=False, timeout=None): """ @@ -293,7 +281,7 @@ class Builds(base.Object): if timeout is None: timeout = datetime.timedelta(hours=3) - build = self._get_build(""" + build = await self._get_build(""" INSERT INTO builds ( @@ -325,14 +313,14 @@ class Builds(base.Object): group.builds.append(build) # Create all jobs - build._create_jobs(timeout=timeout) + await build._create_jobs(timeout=timeout) if not build.is_test(): # Deprecate previous builds - build._deprecate_others() + await build._deprecate_others() # Add watchers - build._add_watchers() + await build._add_watchers() # Add the build into its repository await repo.add_build(build, user=owner) @@ -433,7 +421,7 @@ class Build(base.DataObject): # Mark as deleted self._set_attribute_now("deleted_at") if user: - self._set_attribute("deleted_by", user) + await self._set_attribute("deleted_by", user) # Delete source package await self.pkg.delete(user=user) @@ -467,6 +455,8 @@ class Build(base.DataObject): def finished_at(self): return self.data.finished_at + # Owner + def get_owner(self): """ The owner of this build. @@ -474,10 +464,8 @@ class Build(base.DataObject): if self.data.owner_id: return self.backend.users.get_by_id(self.data.owner_id) - def set_owner(self, owner): - self._set_attribute("owner_id", owner) - - owner = lazy_property(get_owner, set_owner) + async def set_owner(self, owner): + await self._set_attribute("owner_id", owner) # Build Repository @@ -502,13 +490,13 @@ class Build(base.DataObject): def is_broken(self): return self.state == "broken" - def set_severity(self, severity): - self._set_attribute("severity", severity) + # Severity def get_severity(self): return self.data.severity - severity = property(get_severity, set_severity) + async def set_severity(self, severity): + await self._set_attribute("severity", severity) @lazy_property def commit(self): @@ -581,13 +569,8 @@ class Build(base.DataObject): # Jobs - def _get_jobs(self, query, *args): - ret = [] - for job in self.backend.jobs._get_jobs(query, *args): - job.build = self - ret.append(job) - - return ret + async def _get_jobs(self, *args, **kwargs): + return self.backend.jobs._get_jobs(*args, build=self, **kwargs) @property def jobs(self): @@ -672,12 +655,12 @@ class Build(base.DataObject): # Points - def add_points(self, points, user=None): + async def add_points(self, points, user=None): """ Add points (can be negative) """ # Log points - self.db.execute(""" + await self.db.execute(""" INSERT INTO build_points ( @@ -693,7 +676,7 @@ class Build(base.DataObject): ) # Update the cache - self._set_attribute("points", self.points + points) + await self._set_attribute("points", self.points + points) @property def points(self): @@ -705,8 +688,8 @@ class Build(base.DataObject): ## Watchers @lazy_property - def watchers(self): - users = self.backend.users._get_users(""" + async def watchers(self): + users = await self.backend.users._get_users(""" SELECT users.* FROM @@ -724,11 +707,11 @@ class Build(base.DataObject): return set(users) - def add_watcher(self, user): + async def add_watcher(self, user): """ Adds a watcher to this build """ - self.db.execute(""" + await self.db.execute(""" INSERT INTO build_watchers( build_id, @@ -746,11 +729,11 @@ class Build(base.DataObject): # Add to cache self.watchers.add(user) - def remove_watcher(self, user): + async def remove_watcher(self, user): """ Removes a watcher from this build """ - self.db.execute(""" + await self.db.execute(""" UPDATE build_watchers SET @@ -770,7 +753,7 @@ class Build(base.DataObject): except KeyError: pass - def _add_watchers(self): + async def _add_watchers(self): """ Called when a new build is created and automatically adds any watchers """ @@ -797,15 +780,15 @@ class Build(base.DataObject): log.error("Build %s has failed" % self) # Mark as finished - self._set_attribute_now("finished_at") + await self._set_attribute_now("finished_at") # Mark as failed if the build was not successful if not success: - self._set_attribute("failed", True) + await self._set_attribute("failed", True) # Award some negative points on failure if not success: - self.add_points(-1) + await self.add_points(-1) # Notify everyone this build has finished... if success: @@ -859,16 +842,16 @@ class Build(base.DataObject): continue # Send an email to the user - user.send_email(*args, build=self, **kwargs) + await user.send_email(*args, build=self, **kwargs) # Repositories @lazy_property - def repos(self): + async def repos(self): """ Return a list of all repositories this package is in """ - repos = self.backend.repos._get_repositories(""" + repos = await self.backend.repos._get_repositories(""" SELECT repositories.* FROM @@ -931,8 +914,8 @@ class Build(base.DataObject): ## Bugs @lazy_property - def bug_ids(self): - rows = self.db.query(""" + async def bug_ids(self): + rows = await self.db.query(""" SELECT bug_id FROM @@ -948,8 +931,8 @@ class Build(base.DataObject): return set([row.bug_id for row in rows]) - def add_bug(self, bug_id, user=None): - self.db.execute(""" + async def add_bug(self, bug_id, user=None): + await self.db.execute(""" INSERT INTO build_bugs ( @@ -975,8 +958,8 @@ class Build(base.DataObject): # Add to cache self.bug_ids.add(bug_id) - def remove_bug(self, bug_id, user): - self.db.execute(""" + async def remove_bug(self, bug_id, user): + await self.db.execute(""" UPDATE build_bugs SET @@ -1004,11 +987,11 @@ class Build(base.DataObject): # Monitoring Release @lazy_property - def monitoring_release(self): + async def monitoring_release(self): """ Returns the Monitoring Release """ - return self.backend.monitorings._get_release(""" + return await self.backend.monitorings._get_release(""" SELECT * FROM @@ -1023,13 +1006,13 @@ class Build(base.DataObject): # Deprecation - def _deprecate_others(self): + async def _deprecate_others(self): """ Called when this build is being created. This method will find similar builds and automatically deprecate them. """ - builds = self.backend.builds._get_builds(""" + builds = await self.backend.builds._get_builds(""" SELECT builds.* @@ -1079,9 +1062,9 @@ class Build(base.DataObject): # Deprecate all builds for build in builds: - build.deprecate(build=self) + await build.deprecate(build=self) - def deprecate(self, build=None, user=None): + async def deprecate(self, build=None, user=None): """ Called when a build needs to be deprecated """ @@ -1095,9 +1078,9 @@ class Build(base.DataObject): user = build.owner # Mark as deprecated - self._set_attribute_now("deprecated_at") + await self._set_attribute_now("deprecated_at") if user: - self._set_attribute("deprecated_by", user) + await self._set_attribute("deprecated_by", user) # Store the build if build: @@ -1114,25 +1097,25 @@ class Build(base.DataObject): return False @lazy_property - def deprecated_by(self): + async def deprecated_by(self): if self.data.deprecated_by: - return self.backend.users.get_by_id(self.data.deprecated_by) + return await self.backend.users.get_by_id(self.data.deprecated_by) - def get_deprecating_build(self): - if self.data.deprecating_build_id: - return self.backend.builds.get_by_id(self.data.deprecating_build_id) + # Deprecating Build - def set_deprecating_build(self, build): - self._set_attribute("deprecating_build_id", build) + async def get_deprecating_build(self): + if self.data.deprecating_build_id: + return await self.backend.builds.get_by_id(self.data.deprecating_build_id) - deprecating_build = lazy_property(get_deprecating_build, set_deprecating_build) + async def set_deprecating_build(self, build): + await self._set_attribute("deprecating_build_id", build) @lazy_property - def deprecated_builds(self): + async def deprecated_builds(self): """ Returns a list of builds that were deprecated by this build """ - builds = self.backend.builds._get_builds(""" + builds = await self.backend.builds._get_builds(""" SELECT * FROM @@ -1173,8 +1156,8 @@ class Build(base.DataObject): return False @lazy_property - def test_build_for(self): - return self.backend.builds._get_build(""" + async def test_build_for(self): + return await self.backend.builds._get_build(""" WITH build_test_builds AS ( SELECT builds.id AS build_id, @@ -1524,20 +1507,14 @@ class Group(base.DataObject): class Comments(base.Object): - def _get_comments(self, query, *args): - res = self.db.query(query, *args) - - for row in res: - yield Comment(self.backend, row.id, data=row) + def _get_comments(self, *args, **kwargs): + return self.db.fetch_many(Comment, *args, **kwargs) - def _get_comment(self, query, *args): - res = self.db.get(query, *args) - - if res: - return Comment(self.backend, res.id, data=res) + def _get_comment(self, *args, **kwargs): + return self.db.fetch_one(Comment, *args, **kwargs) - def get_by_id(self, id): - return self._get_comment(""" + async def get_by_id(self, id): + return await self._get_comment(""" SELECT * FROM diff --git a/src/buildservice/database.py b/src/buildservice/database.py index bd5b19f3..3feaa0cd 100644 --- a/src/buildservice/database.py +++ b/src/buildservice/database.py @@ -9,7 +9,6 @@ """ import asyncio -import itertools import logging import psycopg import psycopg_pool @@ -44,29 +43,29 @@ class Connection(object): self.__connections = {} # Create a connection pool - self.pool = psycopg_pool.ConnectionPool( + self.pool = psycopg_pool.AsyncConnectionPool( "postgresql://%s:%s@%s/%s" % (user, password, host, database), # Callback to configure any new connections configure=self.__configure, # Set limits for min/max connections in the pool - min_size=8, - max_size=1024, + min_size=4, + max_size=32, # Give clients up to one minute to retrieve a connection timeout=60, - # Close connections after they have been idle for 5 seconds - max_idle=5, + # Close connections after they have been idle for a few minutes + max_idle=120, ) - def __configure(self, conn): + async def __configure(self, conn): """ Configures any newly opened connections """ # Enable autocommit - conn.autocommit = True + await conn.set_autocommit(True) # Return any rows as dicts conn.row_factory = psycopg.rows.dict_row @@ -74,7 +73,7 @@ class Connection(object): # Automatically convert DataObjects conn.adapters.register_dumper(base.DataObject, base.DataObjectDumper) - def connection(self, *args, **kwargs): + async def connection(self, *args, **kwargs): """ Returns a connection from the pool """ @@ -90,16 +89,24 @@ class Connection(object): pass # Fetch a new connection from the pool - conn = self.__connections[task] = self.pool.getconn(*args, **kwargs) + conn = self.__connections[task] = await self.pool.getconn(*args, **kwargs) log.debug("Assigning database connection %s to %s" % (conn, task)) # When the task finishes, release the connection - task.add_done_callback(self.__release_connection) + task.add_done_callback(self.release_connection) return conn - def __release_connection(self, task): + def release_connection(self, task): + """ + Called when a task that requested a connection has finished. + + This method will schedule that the connection is being returned into the pool. + """ + self.backend.run_task(self.__release_connection, task) + + async def __release_connection(self, task): # Retrieve the connection try: conn = self.__connections[task] @@ -112,9 +119,9 @@ class Connection(object): del self.__connections[task] # Return the connection back into the pool - self.pool.putconn(conn) + await self.pool.putconn(conn) - def _execute(self, cursor, execute, query, parameters): + async def _execute(self, cursor, execute, query, parameters): # Store the time we started this query t = time.monotonic() @@ -124,7 +131,7 @@ class Connection(object): pass # Execute the query - execute(query, parameters) + await execute(query, parameters) # How long did this take? elapsed = time.monotonic() - t @@ -132,73 +139,76 @@ class Connection(object): # Log the query time log.debug(" Query time: %.2fms" % (elapsed * 1000)) - def query(self, query, *parameters, **kwparameters): + async def query(self, query, *parameters, **kwparameters): """ Returns a row list for the given query and parameters. """ - conn = self.connection() + conn = await self.connection() - with conn.cursor() as cursor: - self._execute(cursor, cursor.execute, query, parameters or kwparameters) + async with conn.cursor() as cursor: + await self._execute(cursor, cursor.execute, query, parameters or kwparameters) - return [Row(row) for row in cursor] + async for row in cursor: + yield Row(row) - def get(self, query, *parameters, **kwparameters): + async def get(self, query, *parameters, **kwparameters): """ Returns the first row returned for the given query. """ - rows = self.query(query, *parameters, **kwparameters) - if not rows: - return None - elif len(rows) > 1: - raise Exception("Multiple rows returned for Database.get() query") - else: - return rows[0] + rows = [] + + async for row in self.query(query, *parameters, **kwparameters): + rows.append(row) + + if len(rows) > 1: + raise Exception("Multiple rows returned for Database.get() query") + + return rows[0] if rows else None - def execute(self, query, *parameters, **kwparameters): + async def execute(self, query, *parameters, **kwparameters): """ Executes the given query. """ - conn = self.connection() + conn = await self.connection() - with conn.cursor() as cursor: - self._execute(cursor, cursor.execute, query, parameters or kwparameters) + async with conn.cursor() as cursor: + await self._execute(cursor, cursor.execute, query, parameters or kwparameters) - def executemany(self, query, parameters): + async def executemany(self, query, parameters): """ Executes the given query against all the given param sequences. """ - conn = self.connection() + conn = await self.connection() - with conn.cursor() as cursor: - self._execute(cursor, cursor.executemany, query, parameters) + async with conn.cursor() as cursor: + await self._execute(cursor, cursor.executemany, query, parameters) - def transaction(self): + async def transaction(self): """ Creates a new transaction on the current tasks' connection """ - conn = self.connection() + conn = await self.connection() return conn.transaction() - def fetch_one(self, cls, query, *args, **kwargs): + async def fetch_one(self, cls, query, *args, **kwargs): """ Takes a class and a query and will return one object of that class """ # Execute the query - res = self.get(query, *args) + res = await self.get(query, *args) # Return an object (if possible) if res: - return cls(self.backend, res.id, res, **kwargs) + return cls(self.backend, data=res, **kwargs) - def fetch_many(self, cls, query, *args, **kwargs): + async def fetch_many(self, cls, query, *args, **kwargs): # Execute the query res = self.query(query, *args) # Return a generator with objects - for row in res: - yield cls(self.backend, row.id, row, **kwargs) + async for row in res: + yield cls(self.backend, data=row, **kwargs) class Row(dict): diff --git a/src/buildservice/distribution.py b/src/buildservice/distribution.py index b97dd9c1..6037b703 100644 --- a/src/buildservice/distribution.py +++ b/src/buildservice/distribution.py @@ -14,10 +14,10 @@ class Distributions(base.Object): def _get_distributions(self, query, *args): return self.db.fetch_many(Distribution, query, *args) - def _get_distribution(self, query, *args): - return self.db.fetch_one(Distribution, query, *args) + async def _get_distribution(self, query, *args): + return await self.db.fetch_one(Distribution, query, *args) - def __iter__(self): + def __aiter__(self): distros = self._get_distributions(""" SELECT * @@ -30,10 +30,10 @@ class Distributions(base.Object): """, ) - return iter(distros) + return aiter(distros) - def get_by_id(self, distro_id): - return self._get_distribution(""" + async def get_by_id(self, distro_id): + return await self._get_distribution(""" SELECT * FROM @@ -43,8 +43,8 @@ class Distributions(base.Object): """, distro_id, ) - def get_by_slug(self, slug): - return self._get_distribution(""" + async def get_by_slug(self, slug): + return await self._get_distribution(""" SELECT * FROM @@ -56,8 +56,8 @@ class Distributions(base.Object): """, slug, ) - def get_by_tag(self, tag): - return self._get_distribution(""" + async def get_by_tag(self, tag): + return await self._get_distribution(""" SELECT * FROM @@ -69,9 +69,9 @@ class Distributions(base.Object): """, tag, ) - def create(self, name, distro_id, version_id): + async def create(self, name, distro_id, version_id): # Insert into the database - return self._get_distribution(""" + return await self._get_distribution(""" INSERT INTO distributions ( @@ -281,9 +281,8 @@ class Distribution(base.DataObject): # Must be admin return user.is_admin() - @lazy_property - def repos(self): - repos = self.backend.repos._get_repositories(""" + def get_repos(self): + return self.backend.repos._get_repositories(""" SELECT * FROM @@ -295,9 +294,10 @@ class Distribution(base.DataObject): AND owner_id IS NULL""", self.id, - ) - return sorted(repos) + # Populate cache + distro=self, + ) def get_repo(self, slug): repo = self.backend.repos._get_repository(""" @@ -315,11 +315,10 @@ class Distribution(base.DataObject): slug = %s""", self.id, slug, - ) - # Cache - if repo: - repo.distro = self + # Populate cache + distro=self, + ) return repo @@ -339,7 +338,7 @@ class Distribution(base.DataObject): """ Returns all release builds that match the name """ - builds = self.backend.builds._get_builds(""" + return self.backend.builds._get_builds(""" SELECT builds.* FROM @@ -363,13 +362,10 @@ class Distribution(base.DataObject): """, self.id, name, limit, ) - return builds - # Sources - @lazy_property - def sources(self): - sources = self.backend.sources._get_sources(""" + def get_sources(self): + return self.backend.sources._get_sources(""" SELECT sources.* FROM @@ -383,12 +379,10 @@ class Distribution(base.DataObject): """, self.id, ) - return list(sources) - # Releases def get_releases(self, limit=None, offset=None): - releases = self.backend.distros.releases._get_releases(""" + return self.backend.distros.releases._get_releases(""" SELECT * FROM @@ -409,8 +403,6 @@ class Distribution(base.DataObject): distro=self, ) - return list(releases) - def get_release(self, slug): return self.backend.distros.releases._get_release(""" SELECT @@ -426,12 +418,11 @@ class Distribution(base.DataObject): """, self.id, slug, ) - @lazy_property - def latest_release(self): + async def get_latest_release(self): """ Returns the latest and published release """ - return self.backend.distros.releases._get_release(""" + return await self.backend.distros.releases._get_release(""" SELECT * FROM @@ -456,11 +447,11 @@ class Releases(base.Object): def _get_releases(self, query, *args, **kwargs): return self.db.fetch_many(Release, query, *args, **kwargs) - def _get_release(self, query, *args, **kwargs): - return self.db.fetch_one(Release, query, *args, **kwargs) + async def _get_release(self, query, *args, **kwargs): + return await self.db.fetch_one(Release, query, *args, **kwargs) - def get_by_id(self, id): - return self._get_release(""" + async def get_by_id(self, id): + return await self._get_release(""" SELECT * FROM @@ -470,14 +461,14 @@ class Releases(base.Object): """, id, ) - def create(self, distro, name, user, stable=False): + async def create(self, distro, name, user, stable=False): """ Creates a new release """ # Create a slug slug = misc.normalize(name) - release = self._get_release(""" + release = await self._get_release(""" INSERT INTO releases ( @@ -656,8 +647,8 @@ class Images(base.Object): def _get_images(self, query, *args, **kwargs): return self.db.fetch_many(Image, query, *args, **kwargs) - def _get_image(self, query, *args, **kwargs): - return self.db.fetch_one(Image, query, *args, **kwargs) + async def _get_image(self, query, *args, **kwargs): + return await self.db.fetch_one(Image, query, *args, **kwargs) class Image(base.DataObject): diff --git a/src/buildservice/events.py b/src/buildservice/events.py index 886ea4db..ee06e5a8 100644 --- a/src/buildservice/events.py +++ b/src/buildservice/events.py @@ -911,8 +911,8 @@ class Events(base.Object): def map(self): return { # Builds - "build" : self.backend.builds.get_by_id, - "by_build" : self.backend.builds.get_by_id, + "build" : self.backend.builds.get_by_id, + "by_build" : self.backend.builds.get_by_id, # Build Comments "build_comment" : self.backend.builds.comments.get_by_id, @@ -930,17 +930,17 @@ class Events(base.Object): "release" : self.backend.distros.releases.get_by_id, # Repositories - "repository" : self.backend.repos.get_by_id, + "repository" : self.backend.repos.get_by_id, # Builders "builder" : self.backend.builders.get_by_id, # Users - "user" : self.backend.users.get_by_id, - "by_user" : self.backend.users.get_by_id, + "user" : self.backend.users.get_by_id, + "by_user" : self.backend.users.get_by_id, } - def expand(self, events): + async def expand(self, events): """ Expands any log events """ @@ -968,7 +968,7 @@ class Events(base.Object): # Call the expand function on cache miss except KeyError: - value = expand(key) + value = await expand(key) # Store the expanded value try: @@ -981,7 +981,7 @@ class Events(base.Object): yield Event(self.backend, event) - def __call__(self, priority=None, offset=None, limit=None, + async def __call__(self, priority=None, offset=None, limit=None, build=None, builder=None, mirror=None, user=None): """ Returns all events filtered by the given criteria @@ -1015,7 +1015,7 @@ class Events(base.Object): values.append(priority) # Fetch all events - events = self.db.query( + events = await self.db.query( """ %s @@ -1039,7 +1039,7 @@ class Events(base.Object): ) # Expand all events - return self.expand(events) + return await self.expand(events) class Event(base.Object): diff --git a/src/buildservice/jobs.py b/src/buildservice/jobs.py index b2bb6581..c1af558b 100644 --- a/src/buildservice/jobs.py +++ b/src/buildservice/jobs.py @@ -78,14 +78,14 @@ class Jobs(base.Object): # Setup queue self.queue = Queue(self.backend) - def _get_job(self, query, *args, **kwargs): - return self.db.fetch_one(Job, query, *args, **kwargs) + def _get_jobs(self, *args, **kwargs): + return self.db.fetch_many(Job, *args, **kwargs) - def _get_jobs(self, query, *args, **kwargs): - return self.db.fetch_many(Job, query, *args, **kwargs) + async def _get_job(self, *args, **kwargs): + return await self.db.fetch_one(Job, *args, **kwargs) - def create(self, build, arch, superseeds=None, timeout=None): - job = self._get_job(""" + async def create(self, build, arch, superseeds=None, timeout=None): + job = await self._get_job(""" INSERT INTO jobs ( @@ -112,11 +112,11 @@ class Jobs(base.Object): return job - def get_by_id(self, id): - return self._get_job("SELECT * FROM jobs WHERE id = %s", id) + async def get_by_id(self, id): + return await self._get_job("SELECT * FROM jobs WHERE id = %s", id) - def get_by_uuid(self, uuid): - return self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid) + async def get_by_uuid(self, uuid): + return await self._get_job("SELECT * FROM jobs WHERE uuid = %s", uuid) def get_finished(self, failed_only=False, limit=None, offset=None): """ @@ -160,11 +160,10 @@ class Jobs(base.Object): %s """, limit, offset) - return list(jobs) + return jobs - @property - def running(self): - jobs = self._get_jobs(""" + def get_running(self): + return self._get_jobs(""" SELECT jobs.* FROM @@ -179,8 +178,6 @@ class Jobs(base.Object): started_at DESC """) - return list(jobs) - async def launch(self, jobs): """ Called to launch all given jobs @@ -220,7 +217,7 @@ class Jobs(base.Object): """) # Abort them all... - for job in jobs: + async for job in jobs: await job.abort() @@ -228,13 +225,13 @@ class Queue(base.Object): # Locked when the queue is being processed lock = asyncio.Lock() - def __iter__(self): - jobs = self.get_jobs() + async def __aiter__(self): + jobs = await self.get_jobs() - return iter(jobs) + return aiter(jobs) - def __len__(self): - res = self.db.get(""" + async def get_length(self): + res = await self.db.get(""" WITH %s SELECT @@ -248,8 +245,8 @@ class Queue(base.Object): return 0 - def get_jobs(self, limit=None): - jobs = self.backend.jobs._get_jobs(""" + async def get_jobs(self, limit=None): + jobs = await self.backend.jobs._get_jobs(""" WITH %s SELECT @@ -263,11 +260,11 @@ class Queue(base.Object): return list(jobs) - def get_jobs_for_builder(self, builder, limit=None): + async def get_jobs_for_builder(self, builder, limit=None): """ Returns all jobs that the given builder can process. """ - return self.backend.jobs._get_jobs(""" + return await self.backend.jobs._get_jobs(""" WITH %s SELECT @@ -410,8 +407,8 @@ class Job(base.DataObject): # Packages @lazy_property - def packages(self): - packages = self.backend.packages._get_packages(""" + async def packages(self): + packages = await self.backend.packages._get_packages(""" SELECT packages.* FROM diff --git a/src/buildservice/misc.py b/src/buildservice/misc.py index dec4dbc5..54427446 100644 --- a/src/buildservice/misc.py +++ b/src/buildservice/misc.py @@ -54,13 +54,13 @@ def format_size(s): s /= 1024 -def group(items, key): +async def group(items, key): """ This function takes some iterable and returns it grouped by key. """ result = {} - for item in items: + async for item in items: if callable(key): value = key(item) else: diff --git a/src/buildservice/packages.py b/src/buildservice/packages.py index d0d71408..3f24edb4 100644 --- a/src/buildservice/packages.py +++ b/src/buildservice/packages.py @@ -22,17 +22,11 @@ from .errors import * log = logging.getLogger("pbs.packages") class Packages(base.Object): - def _get_package(self, query, *args): - res = self.db.get(query, *args) + async def _get_packages(self, *args, **kwargs): + return await self.db.fetch_many(Package, *args, **kwargs) - if res: - return Package(self.backend, res.id, data=res) - - def _get_packages(self, query, *args): - res = self.db.query(query, *args) - - for row in res: - yield Package(self.backend, row.id, data=row) + async def _get_package(self, *args, **kwargs): + return await self.db.fetch_one(Package, *args, **kwargs) def get_list(self): """ @@ -61,12 +55,11 @@ class Packages(base.Object): """, "src", ) - def get_by_id(self, pkg_id): - return self._get_package("SELECT * FROM packages \ - WHERE id = %s", pkg_id) + async def get_by_id(self, id): + return await self._get_package("SELECT * FROM packages WHERE id = %s", id) - def get_by_uuid(self, uuid): - return self._get_package(""" + async def get_by_uuid(self, uuid): + return await self._get_package(""" SELECT * FROM @@ -86,7 +79,7 @@ class Packages(base.Object): path = buildid_to_path(buildid) # Search for the package containing this file - for package in self.search_by_filename(path, limit=1): + async for package in self.search_by_filename(path, limit=1): log.debug("Found debuginfo for %s in %s" % (buildid, package)) return package @@ -105,7 +98,7 @@ class Packages(base.Object): package = await asyncio.to_thread(archive.get_package) # Check if a package with this UUID exists - pkg = self.get_by_uuid(package.uuid) + pkg = await self.get_by_uuid(package.uuid) if pkg: return pkg @@ -113,13 +106,14 @@ class Packages(base.Object): # Find a matching distribution if not distro: - distro = self.backend.distros.get_by_tag(package.distribution) + distro = await self.backend.distros.get_by_tag(package.distribution) if not distro: log.error("Could not find distribution '%s'" % package.distribution) raise NoSuchDistroError(package.distribution) - pkg = self._get_package(""" + # Insert into database + pkg = await self._get_package(""" INSERT INTO packages ( @@ -192,13 +186,13 @@ class Packages(base.Object): return pkg - def search(self, q, limit=None): + async def search(self, q, limit=None): """ Searches for packages that do match the query. This function does not work for UUIDs or filenames. """ - packages = self._get_packages(""" + packages = await self._get_packages(""" WITH package_search_index AS ( -- Source packages SELECT @@ -270,11 +264,11 @@ class Packages(base.Object): return list(packages) - def search_by_filename(self, filename, limit=None): + async def search_by_filename(self, filename, limit=None): if "*" in filename: filename = filename.replace("*", "%") - packages = self._get_packages(""" + packages = await self._get_packages(""" SELECT DISTINCT ON (packages.name) packages.* @@ -293,7 +287,7 @@ class Packages(base.Object): ) else: - packages = self._get_packages(""" + packages = await self._get_packages(""" SELECT DISTINCT ON (packages.name) packages.* @@ -338,16 +332,16 @@ class Package(base.DataObject): log.info("Deleting package %s" % self) # Mark as deleted - self._set_attribute_now("deleted_at") + await self._set_attribute_now("deleted_at") if user: - self._set_attribute("deleted_by", user) + await self._set_attribute("deleted_by", user) # Unlink the payload if self.path: await self.backend.unlink(self.path) # Reset path - self._set_attribute("path", None) + await self._set_attribute("path", None) def can_be_deleted(self): """ @@ -509,7 +503,7 @@ class Package(base.DataObject): log.debug("Importing %s to %s..." % (self, path)) # Store the path - self._set_attribute("path", path) + await self._set_attribute("path", path) # Copy the file if it doesn't exist, yet if not await self.backend.exists(path): @@ -521,7 +515,7 @@ class Package(base.DataObject): # Fetch the filelist filelist = await asyncio.to_thread(lambda a: a.filelist, archive) - self.db.executemany(""" + await self.db.executemany(""" INSERT INTO package_files ( @@ -567,8 +561,8 @@ class Package(base.DataObject): ) @lazy_property - def builds(self): - builds = self.backend.builds._get_builds(""" + async def builds(self): + builds = await self.backend.builds._get_builds(""" SELECT * FROM @@ -592,21 +586,15 @@ class Package(base.DataObject): # Files - def _get_files(self, query, *args): - res = self.db.query(query, *args) - - for row in res: - yield File(self.backend, package=self, data=row) + def _get_files(self, *args, **kwargs): + return self.db.fetch_many(File, *args, package=self, **kwargs) - def _get_file(self, query, *args): - res = self.db.get(query, *args) - - if res: - return File(self.backend, package=self, data=res) + async def _get_file(self, *args, **kwargs): + return await self.db.fetch_one(File, *args, package=self, **kwargs) @lazy_property - def files(self): - files = self._get_files(""" + async def files(self): + return self._get_files(""" SELECT * FROM @@ -618,10 +606,8 @@ class Package(base.DataObject): """, self.id, ) - return list(files) - - def get_file(self, path): - return self._get_file(""" + async def get_file(self, path): + return await self._get_file(""" SELECT * FROM @@ -633,10 +619,10 @@ class Package(base.DataObject): """, self.id, path, ) - def get_debuginfo(self, buildid): + async def get_debuginfo(self, buildid): path = buildid_to_path(buildid) - return self.get_file(path) + return await self.get_file(path) # Open @@ -648,9 +634,8 @@ class Package(base.DataObject): class File(base.Object): - def init(self, package, data): - self.package = package - self.data = data + def init(self, data, package): + self.data, self.package = data, package def __str__(self): return self.path diff --git a/src/buildservice/releasemonitoring.py b/src/buildservice/releasemonitoring.py index 6cf05d7c..72dfeeb3 100644 --- a/src/buildservice/releasemonitoring.py +++ b/src/buildservice/releasemonitoring.py @@ -78,16 +78,15 @@ class BuildExistsError(Exception): class Monitorings(base.Object): baseurl = "https://release-monitoring.org" - @property - def api_key(self): - return self.settings.get("release-monitoring-api-key") - async def _request(self, method, url, data=None): body = {} + # Fetch the API key + api_key = await self.settings.get("release-monitoring-api-key") + # Authenticate to the API headers = { - "Authorization" : "Token %s" % self.api_key, + "Authorization" : "Token %s" % api_key, } # Compose the url @@ -120,14 +119,14 @@ class Monitorings(base.Object): return body - def _get_monitoring(self, query, *args, **kwargs): - return self.db.fetch_one(Monitoring, query, *args, **kwargs) - def _get_monitorings(self, query, *args, **kwargs): return self.db.fetch_many(Monitoring, query, *args, **kwargs) - def get_by_id(self, id): - return self._get_monitoring(""" + async def _get_monitoring(self, query, *args, **kwargs): + return await self.db.fetch_one(Monitoring, query, *args, **kwargs) + + async def get_by_id(self, id): + return await self._get_monitoring(""" SELECT * FROM @@ -137,8 +136,8 @@ class Monitorings(base.Object): """, id, ) - def get_by_distro_and_name(self, distro, name): - return self._get_monitoring(""" + async def get_by_distro_and_name(self, distro, name): + return await self._get_monitoring(""" SELECT * FROM @@ -154,7 +153,7 @@ class Monitorings(base.Object): async def create(self, distro, name, created_by, project_id, follow="stable", create_builds=True): - monitoring = self._get_monitoring(""" + monitoring = await self._get_monitoring(""" INSERT INTO release_monitorings ( @@ -171,6 +170,8 @@ class Monitorings(base.Object): RETURNING * """, distro, name, created_by, project_id, follow, create_builds, + + # Populate cache distro=distro, ) @@ -216,7 +217,7 @@ class Monitorings(base.Object): """, limit, ) - for monitoring in monitorings: + async for monitoring in monitorings: await monitoring.check() # Releases @@ -224,8 +225,8 @@ class Monitorings(base.Object): def _get_releases(self, query, *args, **kwargs): return self.db.fetch_many(Release, query, *args, **kwargs) - def _get_release(self, query, *args, **kwargs): - return self.db.fetch_one(Release, query, *args, **kwargs) + async def _get_release(self, query, *args, **kwargs): + return await self.db.fetch_one(Release, query, *args, **kwargs) class Monitoring(base.DataObject): @@ -239,11 +240,11 @@ class Monitoring(base.DataObject): return "/distros/%s/monitorings/%s" % (self.distro.slug, self.name) @lazy_property - def distro(self): + async def distro(self): """ The distribution """ - return self.backend.distros.get_by_id(self.data.distro_id) + return await self.backend.distros.get_by_id(self.data.distro_id) @property def name(self): @@ -290,9 +291,9 @@ class Monitoring(base.DataObject): async def delete(self, user=None): # Mark as deleted - self._set_attribute_now("deleted_at") + await self._set_attribute_now("deleted_at") if user: - self._set_attribute("deleted_by", user) + await self._set_attribute("deleted_by", user) # Delete all releases async with asyncio.TaskGroup() as tasks: @@ -309,9 +310,15 @@ class Monitoring(base.DataObject): # Fetch the current versions versions = await self._fetch_versions() - with self.db.transaction(): + # Fetch the latest release + # XXX ??? + + # Fetch the latest build + latest_build = await self.get_latest_build() + + async with await self.db.transaction(): # Store timestamp of this check - self._set_attribute_now("last_check_at") + await self._set_attribute_now("last_check_at") try: if self.follow == "latest": @@ -319,7 +326,7 @@ class Monitoring(base.DataObject): elif self.follow == "stable": release = await self._follow_stable(versions) elif self.follow == "current-branch": - release = await self._follow_current_branch(versions) + release = await self._follow_current_branch(versions, latest_build) else: raise ValueError("Cannot handle follow: %s" % self.follow) @@ -349,34 +356,34 @@ class Monitoring(base.DataObject): # Parse the response as JSON and return it return database.Row(response) - async def _follow_stable(self, versions): + async def _follow_stable(self, versions, *, build): """ This will follow "stable" i.e. the latest stable version """ for version in versions.stable_versions: - return await self.create_release(version) + return await self.create_release(version, build=build) - async def _follow_latest(self, versions): + async def _follow_latest(self, versions, * build): """ This will follow the latest version (including pre-releases) """ - return await self.create_release(versions.latest_version) + return await self.create_release(versions.latest_version, build=build) - async def _follow_current_branch(self, versions): + async def _follow_current_branch(self, versions, *, build): """ This will follow any minor releases in the same branch """ # We cannot perform this if there is no recent build - if not self.latest_build: + if not build: return # Find the next version next_version = self._find_next_version( - self.latest_build.pkg.evr, versions.stable_versions) + latest_build.pkg.evr, versions.stable_versions) # Create a new release with the next version if next_version: - return await self.create_release(next_version) + return await self.create_release(next_version, build=build) def _find_next_version(self, current_version, available_versions): # Remove epoch @@ -429,16 +436,16 @@ class Monitoring(base.DataObject): return self.backend.monitorings._get_releases(query, *args, monitoring=self, **kwargs) - def _get_release(self, query, *args, **kwargs): - return self.backend.monitorings._get_release(query, *args, + async def _get_release(self, query, *args, **kwargs): + return await self.backend.monitorings._get_release(query, *args, monitoring=self, **kwargs) @property - def latest_release(self): + async def latest_release(self): """ Returns the latest release of this package """ - return self._get_release(""" + return await self._get_release(""" SELECT * FROM @@ -453,7 +460,7 @@ class Monitoring(base.DataObject): @lazy_property def releases(self): - releases = self._get_releases(""" + return self._get_releases(""" SELECT * FROM @@ -467,20 +474,20 @@ class Monitoring(base.DataObject): return list(releases) - def _release_exists(self, version): + async def _release_exists(self, version): """ Returns True if this version already exists """ - return version in [release.version for release in self.releases] + return version in [release.version async for release in self.releases] - async def create_release(self, version): + async def create_release(self, version, *, build): """ Creates a new release for this package """ # XXX Do we need to check whether we are going backwards? # Raise an error if the release already exists - if self._release_exists(version): + if await self._release_exists(version): raise ReleaseExistsError(version) # Raise an error if we already have a newer build @@ -489,7 +496,7 @@ class Monitoring(base.DataObject): log.info("%s: Creating new release %s" % (self, version)) - release = self._get_release(""" + release = await self._get_release(""" INSERT INTO release_monitoring_releases ( @@ -511,8 +518,7 @@ class Monitoring(base.DataObject): # Create a build if self.data.create_builds: await release._create_build( - build=self.latest_build, - owner=self.backend.users.pakfire, + build=build, owner=self.backend.users.pakfire, ) # Create a bug report @@ -537,11 +543,10 @@ class Monitoring(base.DataObject): return False - @lazy_property - def latest_build(self): - builds = self.distro.get_builds_by_name(self.name, limit=1) + async def get_latest_build(self): + distro = await self.distro - for build in builds: + async for build in distro.get_builds_by_name(self.name, limit=1): return build @@ -629,11 +634,11 @@ class Release(base.DataObject): { "name" : self.monitoring.name, "version" : self.version, - "url" : self.backend.url_to(self.build.url), + "url" : await self.backend.url_to(self.build.url), }, # Set the URL to point to the build - "url" : self.backend.url_to(self.build.url), + "url" : await self.backend.url_to(self.build.url), } # Create the bug diff --git a/src/buildservice/repository.py b/src/buildservice/repository.py index 61a7de17..105a551b 100644 --- a/src/buildservice/repository.py +++ b/src/buildservice/repository.py @@ -20,14 +20,14 @@ from .decorators import * log = logging.getLogger("pbs.repositories") class Repositories(base.Object): - def _get_repositories(self, query, *args, **kwargs): - return self.db.fetch_many(Repository, query, *args, **kwargs) + def _get_repositories(self, *args, **kwargs): + return self.db.fetch_many(Repository, *args, **kwargs) - def _get_repository(self, query, *args, **kwargs): - return self.db.fetch_one(Repository, query, *args, **kwargs) + async def _get_repository(self, *args, **kwargs): + return await self.db.fetch_one(Repository, *args, **kwargs) - def __iter__(self): - repositories = self._get_repositories(""" + async def __aiter__(self): + repositories = await self._get_repositories(""" SELECT * FROM @@ -39,14 +39,14 @@ class Repositories(base.Object): """, ) - return iter(repositories) + return aiter(repositories) @property - def mirrored(self): + async def mirrored(self): """ Lists all repositories that should be mirrored """ - repos = self._get_repositories(""" + repos = await self._get_repositories(""" SELECT * FROM @@ -65,7 +65,7 @@ class Repositories(base.Object): Creates a new repository """ # Generate a slug - slug = self._make_slug(name, owner=owner) + slug = await self._make_slug(name, owner=owner) # Generate a comment for the key comment = "%s - %s" % (distro, name) @@ -75,7 +75,7 @@ class Repositories(base.Object): # Create a key for this repository key = await self.backend.keys.create(comment=comment, user=owner) - repo = self._get_repository(""" + repo = await self._get_repository(""" INSERT INTO repositories ( @@ -101,17 +101,17 @@ class Repositories(base.Object): return repo - def _make_slug(self, name, owner=None): + async def _make_slug(self, name, owner=None): for i in misc.infinity(): slug = misc.normalize(name, iteration=i) - exists = self.db.get("SELECT 1 FROM repositories \ + exists = await self.db.get("SELECT 1 FROM repositories \ WHERE deleted_at IS NULL AND owner_id = %s AND slug = %s", owner, slug) if not exists: return slug - def get_by_id(self, repo_id): - return self._get_repository("SELECT * FROM repositories \ + async def get_by_id(self, repo_id): + return await self._get_repository("SELECT * FROM repositories \ WHERE id = %s", repo_id) async def write(self): @@ -539,7 +539,7 @@ class Repository(base.DataObject): return list(builds) def get_recent_builds(self, limit=None, offset=None): - builds = self.backend.builds._get_builds(""" + return self.backend.builds._get_builds(""" SELECT builds.* FROM @@ -561,10 +561,8 @@ class Repository(base.DataObject): """, self.id, limit, offset, ) - return list(builds) - - def get_added_at_for_build(self, build): - res = self.db.get(""" + async def get_added_at_for_build(self, build): + res = await self.db.get(""" SELECT added_at FROM @@ -601,11 +599,11 @@ class Repository(base.DataObject): return res.count or 0 - def get_builds_by_name(self, name): + async def get_builds_by_name(self, name): """ Returns an ordered list of all builds that match this name """ - builds = self.backend.builds._get_builds(""" + builds = await self.backend.builds._get_builds(""" SELECT builds.* FROM @@ -627,9 +625,9 @@ class Repository(base.DataObject): return list(builds) - def get_packages(self, arch): + async def get_packages(self, arch): if arch == "src": - packages = self.backend.packages._get_packages(""" + packages = await self.backend.packages._get_packages(""" SELECT packages.* FROM @@ -649,7 +647,7 @@ class Repository(base.DataObject): """, self.id, ) else: - packages = self.backend.packages._get_packages(""" + packages = await self.backend.packages._get_packages(""" SELECT packages.* FROM @@ -681,12 +679,12 @@ class Repository(base.DataObject): return list(packages) @property - def pending_jobs(self): + async def pending_jobs(self): """ Returns a list of all pending jobs that use this repository as their build repository. """ - return self.backend.jobs._get_jobs(""" + return await self.backend.jobs._get_jobs(""" SELECT jobs.* FROM @@ -717,8 +715,8 @@ class Repository(base.DataObject): # Stats @lazy_property - def size(self): - res = self.db.query(""" + async def size(self): + res = await self.db.query(""" SELECT packages.arch AS arch, SUM(packages.filesize) AS size @@ -750,8 +748,8 @@ class Repository(base.DataObject): return { row.arch : row.size for row in res if row.arch in self.distro.arches } @lazy_property - def total_size(self): - res = self.db.get(""" + async def total_size(self): + res = await self.db.get(""" WITH packages AS ( -- Source Packages SELECT @@ -1010,16 +1008,16 @@ class Repository(base.DataObject): Deletes this repository """ # Mark as deleted - self._set_attribute_now("deleted_at") + await self._set_attribute_now("deleted_at") if user: - self._set_attribute("deleted_by", user) + await self._set_attribute("deleted_by", user) # Delete the key if self.key: - self.key.delete() + await self.key.delete() # Remove all builds from this repository - self.db.execute(""" + await self.db.execute(""" UPDATE repository_builds SET diff --git a/src/buildservice/sessions.py b/src/buildservice/sessions.py index 0f4ec1be..68eba694 100644 --- a/src/buildservice/sessions.py +++ b/src/buildservice/sessions.py @@ -6,25 +6,19 @@ from . import misc from .decorators import * class Sessions(base.Object): - def _get_session(self, query, *args): - res = self.db.get(query, *args) + async def _get_sessions(self, *args, **kwargs): + return self.db.fetch_many(Session, *args, **kwargs) - if res: - return Session(self.backend, res.id, data=res) + async def _get_session(self, *args, **kwargs): + return self.db.fetch_one(Session, *args, **kwargs) - def _get_sessions(self, query, *args): - res = self.db.query(query, *args) - - for row in res: - yield Session(self.backend, row.id, data=row) - - def __iter__(self): - sessions = self._get_sessions("SELECT * FROM sessions \ + async def __aiter__(self): + sessions = await self._get_sessions("SELECT * FROM sessions \ WHERE valid_until >= NOW() ORDER BY valid_until DESC") - return iter(sessions) + return aiter(sessions) - def create(self, user, address, user_agent=None): + async def create(self, user, address, user_agent=None): """ Creates a new session in the data. @@ -33,11 +27,26 @@ class Sessions(base.Object): """ session_id = misc.generate_random_string(48) - return self._get_session("INSERT INTO sessions(session_id, user_id, address, user_agent) \ - VALUES(%s, %s, %s, %s) RETURNING *", session_id, user.id, address, user_agent) - - def get_by_session_id(self, session_id): - return self._get_session("SELECT * FROM sessions \ + return await self._get_session(""" + INSERT INTO + sessions + ( + session_id, + user_id, + address, + user_agent + ) + VALUES + ( + %s, %s, %s, %s + ) + RETURNING + * + """, session_id, user.id, address, user_agent, + ) + + async def get_by_session_id(self, session_id): + return await self._get_session("SELECT * FROM sessions \ WHERE session_id = %s AND valid_until >= NOW()", session_id) # Alias function @@ -47,8 +56,8 @@ class Sessions(base.Object): """ Deletes all sessions that are not valid any more """ - with self.db.transaction(): - self.db.execute("DELETE FROM sessions WHERE valid_until < CURRENT_TIMESTAMP") + async with self.db.transaction(): + await self.db.execute("DELETE FROM sessions WHERE valid_until < CURRENT_TIMESTAMP") class Session(base.DataObject): @@ -60,8 +69,8 @@ class Session(base.DataObject): return NotImplemented - def destroy(self): - self.db.execute("DELETE FROM sessions WHERE id = %s", self.id) + async def destroy(self): + await self.db.execute("DELETE FROM sessions WHERE id = %s", self.id) @property def session_id(self): diff --git a/src/buildservice/settings.py b/src/buildservice/settings.py index e71fde1b..2f6b4578 100644 --- a/src/buildservice/settings.py +++ b/src/buildservice/settings.py @@ -3,31 +3,31 @@ from . import base class Settings(base.Object): - def get(self, key, default=None): - res = self.db.get("SELECT v FROM settings WHERE k = %s", key) + async def get(self, key, default=None): + res = await self.db.get("SELECT v FROM settings WHERE k = %s", key) if res: return res.v return default - def get_int(self, key, default=None): - value = self.get(key, default) + async def get_int(self, key, default=None): + value = await self.get(key, default) try: return int(value) except ValueError: return None - def get_float(self, key, default=None): - value = self.get(key, default) + async def get_float(self, key, default=None): + value = await self.get(key, default) try: return float(value) except ValueError: return None - def set(self, key, value): - self.db.execute(""" + async def set(self, key, value): + await self.db.execute(""" INSERT INTO settings( k, @@ -44,3 +44,4 @@ class Settings(base.Object): settings.k = excluded.k """, key, value, ) + diff --git a/src/buildservice/sources.py b/src/buildservice/sources.py index 6dbec0e7..e1a48e32 100644 --- a/src/buildservice/sources.py +++ b/src/buildservice/sources.py @@ -33,7 +33,13 @@ VALID_TAGS = ( ) class Sources(base.Object): - def __iter__(self): + def _get_sources(self, query, *args, **kwargs): + return self.db.fetch_many(Source, query, *args, **kwargs) + + async def _get_source(self, query, *args, **kwargs): + return await self.db.fetch_one(Source, query, *args, **kwargs) + + def __aiter__(self): sources = self._get_sources(""" SELECT * @@ -46,16 +52,10 @@ class Sources(base.Object): """, ) - return iter(sources) - - def _get_sources(self, query, *args, **kwargs): - return self.db.fetch_many(Source, query, *args, **kwargs) - - def _get_source(self, query, *args, **kwargs): - return self.db.fetch_one(Source, query, *args, **kwargs) + return aiter(sources) - def get_by_id(self, id): - return self._get_source(""" + async def get_by_id(self, id): + return await self._get_source(""" SELECT * FROM @@ -65,8 +65,8 @@ class Sources(base.Object): """, id, ) - def get_by_slug(self, slug): - return self._get_source(""" + async def get_by_slug(self, slug): + return await self._get_source(""" SELECT * FROM @@ -78,12 +78,12 @@ class Sources(base.Object): """, slug, ) - def create(self, repo, name, url, user): + async def create(self, repo, name, url, user): # Make slug slug = self._make_slug(name) # Insert into the database - return self._get_source(""" + source = await self._get_source(""" INSERT INTO sources( name, @@ -97,8 +97,13 @@ class Sources(base.Object): RETURNING * """, name, url, user, repo, + + # Populate cache + repo=repo, ) + return source + def _make_slug(self, name): for i in misc.infinity(): slug = misc.normalize(name, iteration=i) @@ -115,11 +120,11 @@ class Sources(base.Object): def _get_commits(self, query, *args, **kwargs): return self.db.fetch_many(Commit, query, *args, **kwargs) - def _get_commit(self, query, *args, **kwargs): - return self.db.fetch_one(Commit, query, *args, **kwargs) + async def _get_commit(self, query, *args, **kwargs): + return await self.db.fetch_one(Commit, query, *args, **kwargs) - def get_commit_by_id(self, id): - return self._get_commit(""" + async def get_commit_by_id(self, id): + return await self._get_commit(""" SELECT * FROM @@ -149,8 +154,8 @@ class Sources(base.Object): def _get_jobs(self, query, *args, **kwargs): return self.db.fetch_many(Job, query, *args, **kwargs) - def _get_job(self, query, *args, **kwargs): - return self.db.fetch_one(Job, query, *args, **kwargs) + async def _get_job(self, query, *args, **kwargs): + return await self.db.fetch_one(Job, query, *args, **kwargs) @property def pending_jobs(self): @@ -182,7 +187,7 @@ class Sources(base.Object): Runs all unfinished jobs of all sources """ # Run jobs one after the other - for job in self.pending_jobs: + async for job in self.pending_jobs: await job.run() @@ -616,14 +621,14 @@ class Commit(base.DataObject): # Jobs - def _create_job(self, action, name): + async def _create_job(self, action, name): """ Creates a new job """ log.info("%s: %s: Created '%s' job for '%s'" \ % (self.source, self.revision, action, name)) - job = self.backend.sources._get_job(""" + job = await self.backend.sources._get_job(""" INSERT INTO source_commit_jobs ( diff --git a/src/buildservice/uploads.py b/src/buildservice/uploads.py index cfac0a34..2e0039e7 100644 --- a/src/buildservice/uploads.py +++ b/src/buildservice/uploads.py @@ -26,26 +26,20 @@ class UnsupportedDigestException(ValueError): pass class Uploads(base.Object): - def _get_upload(self, query, *args): - res = self.db.get(query, *args) + async def _get_uploads(self, *args, **kwargs): + return await self.db.fetch_many(Upload, *args, **kwargs) - if res: - return Upload(self.backend, res.id, data=res) + async def _get_upload(self, *args, **kwargs): + return await self.db.fetch_one(Upload, *args, **kwargs) - def _get_uploads(self, query, *args): - res = self.db.query(query, *args) - - for row in res: - yield Upload(self.backend, row.id, data=row) - - def __iter__(self): - uploads = self._get_uploads("SELECT * FROM uploads \ + async def __aiter__(self): + uploads = await self._get_uploads("SELECT * FROM uploads \ ORDER BY created_at DESC") - return iter(uploads) + return aiter(uploads) - def get_by_uuid(self, uuid): - return self._get_upload(""" + async def get_by_uuid(self, uuid): + return await self._get_upload(""" SELECT * FROM @@ -81,7 +75,7 @@ class Uploads(base.Object): user.check_storage_quota(size) # Allocate a new temporary file - upload = self._get_upload(""" + upload = await self._get_upload(""" INSERT INTO uploads ( @@ -135,7 +129,7 @@ class Uploads(base.Object): async def cleanup(self): # Find all expired uploads - uploads = self._get_uploads(""" + uploads = await self._get_uploads(""" SELECT * FROM @@ -222,7 +216,7 @@ class Upload(base.DataObject): await self.backend.unlink(self.path) # Delete the upload from the database - self.db.execute("DELETE FROM uploads WHERE id = %s", self.id) + await self.db.execute("DELETE FROM uploads WHERE id = %s", self.id) @property def created_at(self): diff --git a/src/buildservice/users.py b/src/buildservice/users.py index 7ad74179..75773067 100644 --- a/src/buildservice/users.py +++ b/src/buildservice/users.py @@ -117,20 +117,14 @@ class Users(base.Object): return self.local.ldap - def _get_user(self, query, *args): - res = self.db.get(query, *args) - - if res: - return User(self.backend, res.id, data=res) - - def _get_users(self, query, *args): - res = self.db.query(query, *args) + def _get_users(self, *args, **kwargs): + return self.db.fetch_many(User, *args, **kwargs) - for row in res: - yield User(self.backend, row.id, data=row) + def _get_user(self, *args, **kwargs): + return self.db.fetch_one(User, *args, **kwargs) - def __iter__(self): - users = self._get_users(""" + async def __aiter__(self): + users = await self._get_users(""" SELECT * FROM @@ -142,20 +136,7 @@ class Users(base.Object): """, ) - return iter(users) - - def __len__(self): - res = self.db.get(""" - SELECT - COUNT(*) AS count - FROM - users - WHERE - deleted_at IS NULL - """, - ) - - return res.count + return aiter(users) def _ldap_query(self, query, attrlist=None, limit=0, search_base=None): search_base = self.backend.config.get("ldap", "base") @@ -216,7 +197,7 @@ class Users(base.Object): return results[0] - def create(self, name, notify=False, storage_quota=None, _attrs=None): + async def create(self, name, notify=False, storage_quota=None, _attrs=None): """ Creates a new user """ @@ -228,7 +209,8 @@ class Users(base.Object): if storage_quota is None: storage_quota = DEFAULT_STORAGE_QUOTA - user = self._get_user(""" + # Insert into database + user = await self._get_user(""" INSERT INTO users ( @@ -249,20 +231,19 @@ class Users(base.Object): # Send a welcome email if notify: - user._send_welcome_email() + await user._send_welcome_email() return user - def get_by_id(self, id): - return self._get_user("SELECT * FROM users \ - WHERE id = %s", id) + async def get_by_id(self, id): + return await self._get_user("SELECT * FROM users WHERE id = %s", id) - def get_by_name(self, name): + async def get_by_name(self, name): """ Fetch a user by its username """ # Try to find a local user - user = self._get_user(""" + user = await self._get_user(""" SELECT * FROM @@ -298,7 +279,7 @@ class Users(base.Object): # Create a new user return self.create(uid) - def get_by_email(self, mail): + async def get_by_email(self, mail): # Strip any excess stuff from the email address name, mail = email.utils.parseaddr(mail) @@ -330,16 +311,16 @@ class Users(base.Object): # Fetch the UID uid = res.get("uid")[0].decode() - return self.get_by_name(uid) + return await self.get_by_name(uid) - def _search_by_email(self, mails, include_missing=True): + async def _search_by_email(self, mails, include_missing=True): """ Takes a list of email addresses and returns all users that could be found """ users = [] for mail in mails: - user = self.get_by_email(mail) + user = await self.get_by_email(mail) # Include the search string if no user could be found if not user and include_missing: @@ -353,7 +334,7 @@ class Users(base.Object): return users - def search(self, q, limit=None): + async def search(self, q, limit=None): # Do nothing in test mode if self.backend.test: log.warning("Cannot search for users in test mode") @@ -374,7 +355,7 @@ class Users(base.Object): ) # Fetch users - users = self._get_users(""" + users = await self._get_users(""" SELECT * FROM @@ -402,11 +383,11 @@ class Users(base.Object): return user @property - def top(self): + async def top(self): """ Returns the top users (with the most builds in the last year) """ - users = self._get_users(""" + users = await self._get_users(""" SELECT DISTINCT users.*, COUNT(builds.id) AS _sort @@ -532,8 +513,8 @@ class User(base.DataObject): def name(self): return self.data.name - def delete(self): - self._set_attribute("deleted", True) + async def delete(self): + await self._set_attribute("deleted", True) # Destroy all sessions for session in self.sessions: @@ -579,19 +560,19 @@ class User(base.DataObject): self.email or "invalid@invalid.tld", )) - def send_email(self, *args, **kwargs): - return self.backend.messages.send_template( + async def send_email(self, *args, **kwargs): + return await self.backend.messages.send_template( *args, recipient=self, locale=self.locale, **kwargs, ) - def _send_welcome_email(self): + async def _send_welcome_email(self): """ Sends a welcome email to the user """ - self.send_email("users/messages/welcome.txt") + await self.send_email("users/messages/welcome.txt") def is_admin(self): return self.data.admin is True @@ -644,24 +625,6 @@ class User(base.DataObject): # No permission return False - @property - def sessions(self): - sessions = self.backend.sessions._get_sessions(""" - SELECT - * - FROM - sessions - WHERE - user_id = %s - AND - valid_until >= NOW() - ORDER BY - created_at - """, self.id, - ) - - return list(sessions) - # Bugzilla async def connect_to_bugzilla(self, api_key): @@ -854,14 +817,14 @@ class User(base.DataObject): # Builds - def get_builds(self, name=None, limit=None, offset=None): + async def get_builds(self, name=None, limit=None, offset=None): """ Returns builds by a certain user """ if name: - return self.get_builds_by_name(name, limit=limit, offset=offset) + return await self.get_builds_by_name(name, limit=limit, offset=offset) - builds = self.backend.builds._get_builds(""" + builds = await self.backend.builds._get_builds(""" SELECT * FROM @@ -883,11 +846,11 @@ class User(base.DataObject): return list(builds) - def get_builds_by_name(self, name, limit=None, offset=None): + async def get_builds_by_name(self, name, limit=None, offset=None): """ Fetches all builds matching name """ - builds = self.backend.builds._get_builds(""" + builds = await self.backend.builds._get_builds(""" SELECT builds.* FROM @@ -1154,11 +1117,11 @@ class UserPushSubscription(base.DataObject): def deleted_at(self): return self.data.deleted_at - def delete(self): + async def delete(self): """ Deletes this subscription """ - self._set_attribute_now("deleted_at") + await self._set_attribute_now("deleted_at") @property def endpoint(self): diff --git a/src/crontab/pakfire-build-service b/src/crontab/pakfire-build-service index 871fd5a3..5a8bfff0 100644 --- a/src/crontab/pakfire-build-service +++ b/src/crontab/pakfire-build-service @@ -1,4 +1,4 @@ MAILTO=pakfire@ipfire.org # Send queued emails once a minute -* * * * * _pakfire pakfire-build-service --logging=warning messages:queue:send +#* * * * * _pakfire pakfire-build-service --logging=warning messages:queue:send diff --git a/src/templates/distros/show.html b/src/templates/distros/show.html index 84f6ddee..7566cd6e 100644 --- a/src/templates/distros/show.html +++ b/src/templates/distros/show.html @@ -68,8 +68,8 @@

{{ _("Latest Release") }}

- {% if distro.latest_release %} - {% module ReleasesList([distro.latest_release]) %} + {% if latest_release %} + {% module ReleasesList([latest_release]) %} {% else %}

{{ _("No release, yet") }} @@ -86,24 +86,24 @@ {# Repositories #} - {% if distro.repos %} + {% if repos %}

{{ _("Repositories") }}

- {% module ReposList(distro.repos) %} + {% module ReposList(repos) %}
{% end %} {# Sources #} - {% if distro.sources %} + {% if sources %}

{{ _("Sources") }}

- {% module SourcesList(distro.sources) %} + {% module SourcesList(sources) %}
{% end %} diff --git a/src/templates/repos/modules/list.html b/src/templates/repos/modules/list.html index 556eb91f..a8c70082 100644 --- a/src/templates/repos/modules/list.html +++ b/src/templates/repos/modules/list.html @@ -1,6 +1,6 @@