]> git.ipfire.org Git - pbs.git/commitdiff
database: Replace the old database model with a connection pool
authorMichael Tremer <michael.tremer@ipfire.org>
Fri, 21 Jul 2023 10:16:51 +0000 (10:16 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Fri, 21 Jul 2023 10:16:51 +0000 (10:16 +0000)
This patch introduces a connection pool to the database. When a function
is executing a query, a connection will be handed to the (asyncio) Task
and remains with that task until its end.

That way, multiple concurrent tasks (even when they are running in the
same thread) will have an exclusive connection to the database which
will avoid that we overwrite and commit transactions of other tasks.

Additionally, transactions can now be nested as psycopg's transactions
are more advanced than the previous Transaction class.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
configure.ac
src/buildservice/database.py

index 8da87f16386d338764a935caa1be8d315dfd3ff6..edeb78282f09055f91fe695e0ba448c98ca2563c 100644 (file)
@@ -88,6 +88,7 @@ AX_PYTHON_MODULE([kerberos], [fatal])
 AX_PYTHON_MODULE([location], [fatal])
 AX_PYTHON_MODULE([markdown], [fatal])
 AX_PYTHON_MODULE([psycopg], [fatal])
+AX_PYTHON_MODULE([psycopg_pool], [fatal])
 AX_PYTHON_MODULE([pygments], [fatal])
 AX_PYTHON_MODULE([systemd.journal], [fatal])
 
index ea62eadb1c2df7a571bfab3c92dace1758c4dafd..8660823ee21907b75347277d65ff88c265002032 100644 (file)
@@ -8,9 +8,11 @@
        as torndb.
 """
 
+import asyncio
 import itertools
 import logging
 import psycopg
+import psycopg_pool
 import time
 
 from . import base
@@ -37,58 +39,103 @@ class Connection(object):
        """
        def __init__(self, backend, host, database, user=None, password=None):
                self.backend = backend
-               self.host = host
-               self.database = database
-
-               self._db = None
-               self._db_args = {
-                       "host"     : host,
-                       "dbname"   : database,
-                       "user"     : user,
-                       "password" : password,
-                       "sslmode"  : "require",
-               }
 
-               try:
-                       self.reconnect()
-               except Exception:
-                       log.error("Cannot connect to database on %s", self.host, exc_info=True)
+               # Stores connections assigned to tasks
+               self.__connections = {}
 
-       def __del__(self):
-               self.close()
+               # Create a connection pool
+               self.pool = psycopg_pool.ConnectionPool(
+                       "postgresql://%s:%s@%s/%s" % (user, password, host, database),
 
-       def close(self):
+                       # Callback to configure any new connections
+                       configure=self.__configure,
+
+                       # Set limits for min/max connections in the pool
+                       min_size=4,
+                       max_size=128,
+               )
+
+       def __configure(self, conn):
                """
-                       Closes this database connection.
+                       Configures any newly opened connections
                """
-               if getattr(self, "_db", None) is not None:
-                       self._db.close()
-                       self._db = None
+               # Enable autocommit
+               conn.autocommit = True
+
+               # Return any rows as dicts
+               conn.row_factory = psycopg.rows.dict_row
+
+               # Automatically convert DataObjects
+               conn.adapters.register_dumper(base.DataObject, base.DataObjectDumper)
 
-       def reconnect(self):
+       def connection(self, *args, **kwargs):
                """
-                       Closes the existing database connection and re-opens it.
+                       Returns a connection from the pool
                """
-               self.close()
+               # Fetch the current task
+               task = asyncio.current_task()
 
-               self._db = psycopg.connect(**self._db_args,
-                       cursor_factory=psycopg.ClientCursor)
-               self._db.autocommit = True
+               assert task, "Could not determine task"
 
-               # Automatically convert DataObjects
-               self._db.adapters.register_dumper(base.DataObject, base.DataObjectDumper)
+               # Try returning the same connection to the same task
+               try:
+                       return self.__connections[task]
+               except KeyError:
+                       pass
+
+               # Fetch a new connection from the pool
+               conn = self.__connections[task] = self.pool.getconn(*args, **kwargs)
+
+               log.debug("Assigning database connection %s to %s" % (conn, task))
+
+               # When the task finishes, release the connection
+               task.add_done_callback(self.__release_connection)
+
+               return conn
+
+       def __release_connection(self, task):
+               # Retrieve the connection
+               try:
+                       conn = self.__connections[task]
+               except KeyError:
+                       return
+
+               log.debug("Releasing database connection %s of %s" % (conn, task))
+
+               # Delete it
+               del self.__connections[task]
+
+               # Return the connection back into the pool
+               self.pool.putconn(conn)
+
+       def _execute(self, cursor, execute, query, parameters):
+               # Store the time we started this query
+               t = time.monotonic()
+
+               try:
+                       log.debug("Running SQL query %s" % (query % parameters))
+               except Exception:
+                       pass
+
+               # Execute the query
+               execute(query, parameters)
+
+               # How long did this take?
+               elapsed = time.monotonic() - t
+
+               # Log the query time
+               log.debug("  Query time: %.2fms" % (elapsed * 1000))
 
        def query(self, query, *parameters, **kwparameters):
                """
                        Returns a row list for the given query and parameters.
                """
-               cursor = self._cursor()
-               try:
-                       self._execute(cursor, query, parameters, kwparameters)
-                       column_names = [d[0] for d in cursor.description]
-                       return [Row(zip(column_names, row)) for row in cursor]
-               finally:
-                       cursor.close()
+               conn = self.connection()
+
+               with conn.cursor() as cursor:
+                       self._execute(cursor, cursor.execute, query, parameters or kwparameters)
+
+                       return [Row(row) for row in cursor]
 
        def get(self, query, *parameters, **kwparameters):
                """
@@ -106,58 +153,27 @@ class Connection(object):
                """
                        Executes the given query.
                """
-               cursor = self._cursor()
-               try:
-                       self._execute(cursor, query, parameters, kwparameters)
-               finally:
-                       cursor.close()
+               conn = self.connection()
+
+               with conn.cursor() as cursor:
+                       self._execute(cursor, cursor.execute, query, parameters or kwparameters)
 
        def executemany(self, query, parameters):
                """
                        Executes the given query against all the given param sequences.
                """
-               cursor = self._cursor()
-
-               try:
-                       cursor.executemany(query, parameters)
-               finally:
-                       cursor.close()
-
-       def _ensure_connected(self):
-               if self._db is None:
-                       self.reconnect()
-
-       def _cursor(self):
-               self._ensure_connected()
-               return self._db.cursor()
-
-       def _execute(self, cursor, query, parameters, kwparameters):
-               log.debug(
-                               "Executing query: %s" % \
-                                               cursor.mogrify(query, kwparameters or parameters),
-               )
+               conn = self.connection()
 
-               # Store the time when the query started
-               t = time.monotonic()
-
-               try:
-                       return cursor.execute(query, kwparameters or parameters)
-
-               # Catch any errors
-               except OperationalError:
-                       log.error("Error connecting to database on %s", self.host)
-                       self.close()
-                       raise
-
-               # Log how long the query took
-               finally:
-                       # Determine duration the query took
-                       d = time.monotonic() - t
-
-                       log.debug("Query took %.2fms" % (d * 1000.0))
+               with conn.cursor() as cursor:
+                       self._execute(cursor, cursor.executemany, query, parameters)
 
        def transaction(self):
-               return Transaction(self)
+               """
+                       Creates a new transaction on the current tasks' connection
+               """
+               conn = self.connection()
+
+               return conn.transaction()
 
        def fetch_one(self, cls, query, *args, **kwargs):
                """
@@ -186,20 +202,3 @@ class Row(dict):
                        return self[name]
                except KeyError:
                        raise AttributeError(name)
-
-
-class Transaction(object):
-       def __init__(self, db):
-               self.db = db
-
-               self.db.execute("START TRANSACTION")
-
-       def __enter__(self):
-               return self
-
-       def __exit__(self, exctype, excvalue, traceback):
-               if exctype is not None:
-                       self.db.execute("ROLLBACK")
-               else:
-                       self.db.execute("COMMIT")
-