as torndb.
"""
+import asyncio
import itertools
import logging
import psycopg
+import psycopg_pool
import time
from . import base
"""
def __init__(self, backend, host, database, user=None, password=None):
self.backend = backend
- self.host = host
- self.database = database
-
- self._db = None
- self._db_args = {
- "host" : host,
- "dbname" : database,
- "user" : user,
- "password" : password,
- "sslmode" : "require",
- }
- try:
- self.reconnect()
- except Exception:
- log.error("Cannot connect to database on %s", self.host, exc_info=True)
+ # Stores connections assigned to tasks
+ self.__connections = {}
- def __del__(self):
- self.close()
+ # Create a connection pool
+ self.pool = psycopg_pool.ConnectionPool(
+ "postgresql://%s:%s@%s/%s" % (user, password, host, database),
- def close(self):
+ # Callback to configure any new connections
+ configure=self.__configure,
+
+ # Set limits for min/max connections in the pool
+ min_size=4,
+ max_size=128,
+ )
+
+ def __configure(self, conn):
"""
- Closes this database connection.
+ Configures any newly opened connections
"""
- if getattr(self, "_db", None) is not None:
- self._db.close()
- self._db = None
+ # Enable autocommit
+ conn.autocommit = True
+
+ # Return any rows as dicts
+ conn.row_factory = psycopg.rows.dict_row
+
+ # Automatically convert DataObjects
+ conn.adapters.register_dumper(base.DataObject, base.DataObjectDumper)
- def reconnect(self):
+ def connection(self, *args, **kwargs):
"""
- Closes the existing database connection and re-opens it.
+ Returns a connection from the pool
"""
- self.close()
+ # Fetch the current task
+ task = asyncio.current_task()
- self._db = psycopg.connect(**self._db_args,
- cursor_factory=psycopg.ClientCursor)
- self._db.autocommit = True
+ assert task, "Could not determine task"
- # Automatically convert DataObjects
- self._db.adapters.register_dumper(base.DataObject, base.DataObjectDumper)
+ # Try returning the same connection to the same task
+ try:
+ return self.__connections[task]
+ except KeyError:
+ pass
+
+ # Fetch a new connection from the pool
+ conn = self.__connections[task] = self.pool.getconn(*args, **kwargs)
+
+ log.debug("Assigning database connection %s to %s" % (conn, task))
+
+ # When the task finishes, release the connection
+ task.add_done_callback(self.__release_connection)
+
+ return conn
+
+ def __release_connection(self, task):
+ # Retrieve the connection
+ try:
+ conn = self.__connections[task]
+ except KeyError:
+ return
+
+ log.debug("Releasing database connection %s of %s" % (conn, task))
+
+ # Delete it
+ del self.__connections[task]
+
+ # Return the connection back into the pool
+ self.pool.putconn(conn)
+
+ def _execute(self, cursor, execute, query, parameters):
+ # Store the time we started this query
+ t = time.monotonic()
+
+ try:
+ log.debug("Running SQL query %s" % (query % parameters))
+ except Exception:
+ pass
+
+ # Execute the query
+ execute(query, parameters)
+
+ # How long did this take?
+ elapsed = time.monotonic() - t
+
+ # Log the query time
+ log.debug(" Query time: %.2fms" % (elapsed * 1000))
def query(self, query, *parameters, **kwparameters):
"""
Returns a row list for the given query and parameters.
"""
- cursor = self._cursor()
- try:
- self._execute(cursor, query, parameters, kwparameters)
- column_names = [d[0] for d in cursor.description]
- return [Row(zip(column_names, row)) for row in cursor]
- finally:
- cursor.close()
+ conn = self.connection()
+
+ with conn.cursor() as cursor:
+ self._execute(cursor, cursor.execute, query, parameters or kwparameters)
+
+ return [Row(row) for row in cursor]
def get(self, query, *parameters, **kwparameters):
"""
"""
Executes the given query.
"""
- cursor = self._cursor()
- try:
- self._execute(cursor, query, parameters, kwparameters)
- finally:
- cursor.close()
+ conn = self.connection()
+
+ with conn.cursor() as cursor:
+ self._execute(cursor, cursor.execute, query, parameters or kwparameters)
def executemany(self, query, parameters):
"""
Executes the given query against all the given param sequences.
"""
- cursor = self._cursor()
-
- try:
- cursor.executemany(query, parameters)
- finally:
- cursor.close()
-
- def _ensure_connected(self):
- if self._db is None:
- self.reconnect()
-
- def _cursor(self):
- self._ensure_connected()
- return self._db.cursor()
-
- def _execute(self, cursor, query, parameters, kwparameters):
- log.debug(
- "Executing query: %s" % \
- cursor.mogrify(query, kwparameters or parameters),
- )
+ conn = self.connection()
- # Store the time when the query started
- t = time.monotonic()
-
- try:
- return cursor.execute(query, kwparameters or parameters)
-
- # Catch any errors
- except OperationalError:
- log.error("Error connecting to database on %s", self.host)
- self.close()
- raise
-
- # Log how long the query took
- finally:
- # Determine duration the query took
- d = time.monotonic() - t
-
- log.debug("Query took %.2fms" % (d * 1000.0))
+ with conn.cursor() as cursor:
+ self._execute(cursor, cursor.executemany, query, parameters)
def transaction(self):
- return Transaction(self)
+ """
+ Creates a new transaction on the current tasks' connection
+ """
+ conn = self.connection()
+
+ return conn.transaction()
def fetch_one(self, cls, query, *args, **kwargs):
"""
return self[name]
except KeyError:
raise AttributeError(name)
-
-
-class Transaction(object):
- def __init__(self, db):
- self.db = db
-
- self.db.execute("START TRANSACTION")
-
- def __enter__(self):
- return self
-
- def __exit__(self, exctype, excvalue, traceback):
- if exctype is not None:
- self.db.execute("ROLLBACK")
- else:
- self.db.execute("COMMIT")
-