from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
+ # Define a custom connection class that overrides the close() method
class MyConnection(psycopg.Connection):
def close(self):
if pool := getattr(self, "_pool", None):
# close the connection for real.
super().close()
+ # Create a connection pool that uses the custom connection class
mypool = psycopg_pool.ConnectionPool(
connection_class=MyConnection,
conninfo="postgresql://scott:tiger@localhost/test",
max_idle=60, # Maximum idle time (seconds) for unused pool connections
)
+ # Create an engine that uses the connection pool to get a connection
engine = create_engine(
url="postgresql+psycopg://", # Only need the dialect now
poolclass=NullPool, # Disable SQLAlchemy's default connection pool
creator=mypool.getconn, # Use Psycopg 3 connection pool to obtain connections
)
-Usage of ``psycopg_pool.AsyncConnectionPool`` is similar, but it requires the
-following ``psycopg.AsyncConnection`` subclass instead:
+Here is an example that uses ``psycopg_pool.AsyncConnectionPool``::
- class MyConnection(psycopg.AsyncConnection):
+ import psycopg
+ import psycopg_pool
+ from sqlalchemy.ext.asyncio import create_async_engine
+ from sqlalchemy.pool import NullPool
+
+ # Define a custom connection class that overrides the close() method
+ class MyAsyncConnection(psycopg.AsyncConnection):
async def close(self):
if pool := getattr(self, "_pool", None):
# Connection currently checked out from its pool;
# close the connection for real.
await super().close()
+ async def my_async_code():
+ # Create a connection pool that uses the custom connection class
+ # (must be created inside an async event loop)
+ mypool = psycopg_pool.AsyncConnectionPool(
+ connection_class=MyAsyncConnection,
+ conninfo="postgresql://scott:tiger@localhost/test",
+ min_size=1, # Minimum pool size
+ max_size=5, # Maximum pool size
+ max_idle=60, # Maximum idle time (seconds) for unused pool connections
+ )
+
+ # Define a callable that awaits a connection
+ async def get_connection():
+ return await mypool.getconn()
+
+ # Create an engine that uses the connection pool to get a connection
+ engine = create_async_engine(
+ url="postgresql+psycopg://", # Only need the dialect now
+ poolclass=NullPool, # Disable SQLAlchemy's default connection pool
+ async_creator=get_connection, # Use Psycopg 3 connection pool to obtain connections
+ )
+
The resulting engine may then be used normally. Internally, Psycopg 3 handles
connection pooling::