From: Michael Tremer Date: Thu, 7 Mar 2024 12:16:30 +0000 (+0000) Subject: database: Create a connection pool for async operation X-Git-Tag: 0.9.18~96 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=804e209b2359ac9ba9678ff281a36bb1dae06a41;p=location%2Flibloc.git database: Create a connection pool for async operation This is very useful when we want to run multiple tasks concurrently using asyncio. Signed-off-by: Michael Tremer --- diff --git a/src/python/location/database.py b/src/python/location/database.py index 82b2bfc..28ce20d 100644 --- a/src/python/location/database.py +++ b/src/python/location/database.py @@ -2,8 +2,10 @@ A lightweight wrapper around psycopg3. """ +import asyncio import logging import psycopg +import psycopg_pool import time # Setup logging @@ -11,17 +13,77 @@ log = logging.getLogger("location.database") class Connection(object): def __init__(self, host, database, user=None, password=None): + # Stores connections assigned to tasks + self.__connections = {} + # Create a connection pool - self.connection = psycopg.connect( + self.pool = psycopg_pool.ConnectionPool( "postgresql://%s:%s@%s/%s" % (user, password, host, database), - # Enable autocommit - autocommit=True, + # Callback to configure any new connections + configure=self.__configure, + + # Set limits for min/max connections in the pool + min_size=1, + max_size=512, - # Return any rows as dicts - row_factory = psycopg.rows.dict_row, + # Give clients up to one minute to retrieve a connection + timeout=60, + + # Close connections after they have been idle for a few seconds + max_idle=5, ) + def __configure(self, conn): + """ + Configures any newly opened connections + """ + # Enable autocommit + conn.autocommit = True + + # Return any rows as dicts + conn.row_factory = psycopg.rows.dict_row + + def connection(self, *args, **kwargs): + """ + Returns a connection from the pool + """ + # Fetch the current task + task = asyncio.current_task() + + assert task, "Could not determine task" + + # Try returning the same connection to the same task + try: + return self.__connections[task] + except KeyError: + pass + + # Fetch a new connection from the pool + conn = self.__connections[task] = self.pool.getconn(*args, **kwargs) + + log.debug("Assigning database connection %s to %s" % (conn, task)) + + # When the task finishes, release the connection + task.add_done_callback(self.__release_connection) + + return conn + + def __release_connection(self, task): + # Retrieve the connection + try: + conn = self.__connections[task] + except KeyError: + return + + log.debug("Releasing database connection %s of %s" % (conn, task)) + + # Delete it + del self.__connections[task] + + # Return the connection back into the pool + self.pool.putconn(conn) + def _execute(self, cursor, execute, query, parameters): # Store the time we started this query #t = time.monotonic() @@ -44,7 +106,9 @@ class Connection(object): """ Returns a row list for the given query and parameters. """ - with self.connection.cursor() as cursor: + conn = self.connection() + + with conn.cursor() as cursor: self._execute(cursor, cursor.execute, query, parameters or kwparameters) return [Row(row) for row in cursor] @@ -65,21 +129,27 @@ class Connection(object): """ Executes the given query. """ - with self.connection.cursor() as cursor: + conn = self.connection() + + with conn.cursor() as cursor: self._execute(cursor, cursor.execute, query, parameters or kwparameters) def executemany(self, query, parameters): """ Executes the given query against all the given param sequences. """ - with self.connection.cursor() as cursor: + conn = self.connection() + + with conn.cursor() as cursor: self._execute(cursor, cursor.executemany, query, parameters) def transaction(self): """ Creates a new transaction on the current tasks' connection """ - return self.connection.transaction() + conn = self.connection() + + return conn.transaction() class Row(dict):