]> git.ipfire.org Git - people/ms/libloc.git/commitdiff
database: Create a connection pool for async operation
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 7 Mar 2024 12:16:30 +0000 (12:16 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 7 Mar 2024 12:16:30 +0000 (12:16 +0000)
This is very useful when we want to run multiple tasks concurrently
using asyncio.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/python/location/database.py

index 82b2bfcca412c903e72581f53c9d14fcf54c660a..28ce20d385314e0a73422d0688d9f0ee5071ce06 100644 (file)
@@ -2,8 +2,10 @@
        A lightweight wrapper around psycopg3.
 """
 
+import asyncio
 import logging
 import psycopg
+import psycopg_pool
 import time
 
 # Setup logging
@@ -11,17 +13,77 @@ log = logging.getLogger("location.database")
 
 class Connection(object):
        def __init__(self, host, database, user=None, password=None):
+               # Stores connections assigned to tasks
+               self.__connections = {}
+
                # Create a connection pool
-               self.connection = psycopg.connect(
+               self.pool = psycopg_pool.ConnectionPool(
                        "postgresql://%s:%s@%s/%s" % (user, password, host, database),
 
-                       # Enable autocommit
-                       autocommit=True,
+                       # Callback to configure any new connections
+                       configure=self.__configure,
+
+                       # Set limits for min/max connections in the pool
+                       min_size=1,
+                       max_size=512,
 
-                       # Return any rows as dicts
-                       row_factory = psycopg.rows.dict_row,
+                       # Give clients up to one minute to retrieve a connection
+                       timeout=60,
+
+                       # Close connections after they have been idle for a few seconds
+                       max_idle=5,
                )
 
+       def __configure(self, conn):
+               """
+                       Configures any newly opened connections
+               """
+               # Enable autocommit
+               conn.autocommit = True
+
+               # Return any rows as dicts
+               conn.row_factory = psycopg.rows.dict_row
+
+       def connection(self, *args, **kwargs):
+               """
+                       Returns a connection from the pool
+               """
+               # Fetch the current task
+               task = asyncio.current_task()
+
+               assert task, "Could not determine task"
+
+               # Try returning the same connection to the same task
+               try:
+                       return self.__connections[task]
+               except KeyError:
+                       pass
+
+               # Fetch a new connection from the pool
+               conn = self.__connections[task] = self.pool.getconn(*args, **kwargs)
+
+               log.debug("Assigning database connection %s to %s" % (conn, task))
+
+               # When the task finishes, release the connection
+               task.add_done_callback(self.__release_connection)
+
+               return conn
+
+       def __release_connection(self, task):
+               # Retrieve the connection
+               try:
+                       conn = self.__connections[task]
+               except KeyError:
+                       return
+
+               log.debug("Releasing database connection %s of %s" % (conn, task))
+
+               # Delete it
+               del self.__connections[task]
+
+               # Return the connection back into the pool
+               self.pool.putconn(conn)
+
        def _execute(self, cursor, execute, query, parameters):
                # Store the time we started this query
                #t = time.monotonic()
@@ -44,7 +106,9 @@ class Connection(object):
                """
                        Returns a row list for the given query and parameters.
                """
-               with self.connection.cursor() as cursor:
+               conn = self.connection()
+
+               with conn.cursor() as cursor:
                        self._execute(cursor, cursor.execute, query, parameters or kwparameters)
 
                        return [Row(row) for row in cursor]
@@ -65,21 +129,27 @@ class Connection(object):
                """
                        Executes the given query.
                """
-               with self.connection.cursor() as cursor:
+               conn = self.connection()
+
+               with conn.cursor() as cursor:
                        self._execute(cursor, cursor.execute, query, parameters or kwparameters)
 
        def executemany(self, query, parameters):
                """
                        Executes the given query against all the given param sequences.
                """
-               with self.connection.cursor() as cursor:
+               conn = self.connection()
+
+               with conn.cursor() as cursor:
                        self._execute(cursor, cursor.executemany, query, parameters)
 
        def transaction(self):
                """
                        Creates a new transaction on the current tasks' connection
                """
-               return self.connection.transaction()
+               conn = self.connection()
+
+               return conn.transaction()
 
 
 class Row(dict):