self,
conninfo: str = "",
*,
+ connection_class: Type[AsyncConnection] = AsyncConnection,
configure: Optional[
Callable[[AsyncConnection], Awaitable[None]]
] = None,
"async pool not supported before Python 3.7"
)
+ self.connection_class = connection_class
self._configure = configure
self._reset = reset
async def _connect(self) -> AsyncConnection:
"""Return a new connection configured for the pool."""
- conn = await AsyncConnection.connect(self.conninfo, **self.kwargs)
+ conn = await self.connection_class.connect(
+ self.conninfo, **self.kwargs
+ )
conn._pool = self
if self._configure:
self,
conninfo: str = "",
*,
+ connection_class: Type[Connection] = Connection,
configure: Optional[Callable[[Connection], None]] = None,
reset: Optional[Callable[[Connection], None]] = None,
**kwargs: Any,
):
+ self.connection_class = connection_class
self._configure = configure
self._reset = reset
self._stats[self._CONNECTIONS_NUM] += 1
t0 = monotonic()
try:
- conn = Connection.connect(self.conninfo, **self.kwargs)
+ conn = self.connection_class.connect(self.conninfo, **self.kwargs)
except Exception:
self._stats[self._CONNECTIONS_ERRORS] += 1
raise
pool.ConnectionPool(dsn, minconn=4, maxconn=2)
+def test_connection_class(dsn):
+ class MyConn(psycopg3.Connection):
+ pass
+
+ with pool.ConnectionPool(dsn, connection_class=MyConn, minconn=1) as p:
+ with p.connection() as conn:
+ assert isinstance(conn, MyConn)
+
+
def test_kwargs(dsn):
with pool.ConnectionPool(dsn, kwargs={"autocommit": True}, minconn=1) as p:
with p.connection() as conn:
pool.AsyncConnectionPool(dsn, minconn=4, maxconn=2)
+async def test_connection_class(dsn):
+ class MyConn(psycopg3.AsyncConnection):
+ pass
+
+ async with pool.AsyncConnectionPool(
+ dsn, connection_class=MyConn, minconn=1
+ ) as p:
+ async with p.connection() as conn:
+ assert isinstance(conn, MyConn)
+
+
async def test_kwargs(dsn):
async with pool.AsyncConnectionPool(
dsn, kwargs={"autocommit": True}, minconn=1