- Add support for async `!reconnect_failed` callbacks in `AsyncConnectionPool`
(:ticket:`#520`).
- Make connection pool classes generic on the connection type (:ticket:`#559`).
-- Raise a warning if a pool is used relying on an implicit `!open=True` and the
+- Raise a warning if sync pools rely an implicit `!open=True` and the
pool context is not used. In the future the default will become `!False`
(:ticket:`#659`).
+- Raise a warning if async pools are opened in the constructor. In the future
+ it will become an error. (:ticket:`#659`).
psycopg_pool 3.1.9 (unreleased)
from time import monotonic
from random import random
from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING
-import warnings
from psycopg import errors as e
raise PoolClosed(f"the pool {self.name!r} is already closed")
else:
raise PoolClosed(f"the pool {self.name!r} is not open yet")
- elif self._open_implicit:
- warnings.warn(
- f"the default for the {type(self).__name__} 'open' parameter will"
- " become 'False' in a future release; please use open={True|False}"
- " explicitly or use the pool as context manager",
- DeprecationWarning,
- )
def _check_pool_putconn(self, conn: "BaseConnection[Any]") -> None:
pool = getattr(conn, "_pool", None)
from __future__ import annotations
import logging
+import warnings
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
self._stop_workers()
+ def _check_open_getconn(self) -> None:
+ super()._check_open_getconn()
+
+ if self._open_implicit:
+ self._open_implicit = False
+
+ warnings.warn(
+ f"the default for the {type(self).__name__} 'open' parameter"
+ + " will become 'False' in a future release. Please use"
+ + " open={True|False} explicitly, or use the pool as context"
+ + f" manager using: `with {type(self).__name__}(...) as pool: ...`",
+ DeprecationWarning,
+ )
+
def wait(self, timeout: float = 30.0) -> None:
"""
Wait for the pool to be full (with `min_size` connections) after creation.
from __future__ import annotations
import logging
+import warnings
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
num_workers=num_workers,
)
+ if True: # ASYNC
+ if open:
+ self._warn_open_async()
+
if open is None:
open = self._open_implicit = True
self._stop_workers()
+ def _check_open_getconn(self) -> None:
+ super()._check_open_getconn()
+
+ if self._open_implicit:
+ self._open_implicit = False
+
+ if True: # ASYNC
+ # If open was explicit, we already warned it in __init__
+ self._warn_open_async()
+ else:
+ warnings.warn(
+ f"the default for the {type(self).__name__} 'open' parameter"
+ + " will become 'False' in a future release. Please use"
+ + " open={True|False} explicitly, or use the pool as context"
+ + f" manager using: `with {type(self).__name__}(...) as pool: ...`",
+ DeprecationWarning,
+ )
+
+ if True: # ASYNC
+
+ def _warn_open_async(self) -> None:
+ warnings.warn(
+ f"opening the async pool {type(self).__name__} in the constructor"
+ " is deprecated and will not be supported anymore in a future"
+ " release. Please use `await pool.open()`, or use the pool as context"
+ f" manager using: `async with {type(self).__name__}(...) as pool: `...",
+ DeprecationWarning,
+ )
+
async def wait(self, timeout: float = 30.0) -> None:
"""
Wait for the pool to be full (with `min_size` connections) after creation.
def test_del_no_warning(dsn, recwarn):
- p = pool.ConnectionPool(dsn, min_size=2, open=True)
+ p = pool.ConnectionPool(dsn, min_size=2, open=False)
+ p.open()
with p.connection() as conn:
conn.execute("select 1")
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
try:
- with pool.ConnectionPool(dsn, min_size=4, open=True) as p:
+ with pool.ConnectionPool(dsn, min_size=4) as p:
p.wait(timeout=2)
finally:
logger.removeHandler(handler)
async def test_del_no_warning(dsn, recwarn):
- p = pool.AsyncConnectionPool(dsn, min_size=2, open=True)
+ p = pool.AsyncConnectionPool(dsn, min_size=2, open=False)
+ await p.open()
async with p.connection() as conn:
await conn.execute("select 1")
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
try:
- async with pool.AsyncConnectionPool(dsn, min_size=4, open=True) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=4) as p:
await p.wait(timeout=2)
finally:
logger.removeHandler(handler)
def test_cant_create_open_outside_loop(dsn):
- with pytest.raises(RuntimeError):
- pool.AsyncConnectionPool(dsn, open=True)
+ with pytest.warns(DeprecationWarning):
+ with pytest.raises(RuntimeError):
+ pool.AsyncConnectionPool(dsn, open=True)
@pytest.fixture
def test_create_warning(pool_cls, dsn):
- # No warning on explicit open
+ # No warning on explicit open for sync pool
p = pool_cls(dsn, open=True)
try:
with p.connection():
def test_closed_getconn(pool_cls, dsn):
- p = pool_cls(dsn, min_size=min_size(pool_cls), open=True)
+ p = pool_cls(dsn, min_size=min_size(pool_cls), open=False)
+ p.open()
assert not p.closed
with p.connection():
pass
def test_close_connection_on_pool_close(pool_cls, dsn):
- p = pool_cls(dsn, min_size=min_size(pool_cls), open=True)
+ p = pool_cls(dsn, min_size=min_size(pool_cls), open=False)
+ p.open()
with p.connection() as conn:
p.close()
assert conn.closed
e1 = Event()
e2 = Event()
- p = pool_cls(dsn, min_size=min_size(pool_cls), max_size=1, open=True)
- p.wait()
- success: List[str] = []
+ with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) as p:
+ p.wait()
+ success: List[str] = []
- t1 = spawn(w1)
- # Wait until w1 has received a connection
- e1.wait()
+ t1 = spawn(w1)
+ # Wait until w1 has received a connection
+ e1.wait()
- t2 = spawn(w2)
- # Wait until w2 is in the queue
- ensure_waiting(p)
- p.close()
+ t2 = spawn(w2)
+ # Wait until w2 is in the queue
+ ensure_waiting(p)
# Wait for the workers to finish
e2.set()
def test_open_no_op(pool_cls, dsn):
- p = pool_cls(dsn, open=True)
+ p = pool_cls(dsn, open=False)
+ p.open()
try:
assert not p.closed
p.open()
def test_reopen(pool_cls, dsn):
- p = pool_cls(dsn, open=True)
+ p = pool_cls(dsn, open=False)
+ p.open()
with p.connection() as conn:
conn.execute("select 1")
p.close()
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
try:
- with pool_cls(dsn, min_size=min_size(pool_cls, 4), open=True) as p:
+ with pool_cls(dsn, min_size=min_size(pool_cls, 4)) as p:
p.wait(timeout=2)
finally:
logger.removeHandler(handler)
async def test_create_warning(pool_cls, dsn):
- # No warning on explicit open
- p = pool_cls(dsn, open=True)
- try:
- async with p.connection():
- pass
- finally:
- await p.close()
+ if True: # ASYNC
+ # warning on explicit open too on async
+ with pytest.warns(DeprecationWarning):
+ p = pool_cls(dsn, open=True)
+ await p.close()
+
+ else:
+ # No warning on explicit open for sync pool
+ p = pool_cls(dsn, open=True)
+ try:
+ async with p.connection():
+ pass
+ finally:
+ await p.close()
# No warning on explicit close
p = pool_cls(dsn, open=False)
async def test_closed_getconn(pool_cls, dsn):
- p = pool_cls(dsn, min_size=min_size(pool_cls), open=True)
+ p = pool_cls(dsn, min_size=min_size(pool_cls), open=False)
+ await p.open()
assert not p.closed
async with p.connection():
pass
async def test_close_connection_on_pool_close(pool_cls, dsn):
- p = pool_cls(dsn, min_size=min_size(pool_cls), open=True)
+ p = pool_cls(dsn, min_size=min_size(pool_cls), open=False)
+ await p.open()
async with p.connection() as conn:
await p.close()
assert conn.closed
e1 = AEvent()
e2 = AEvent()
- p = pool_cls(dsn, min_size=min_size(pool_cls), max_size=1, open=True)
- await p.wait()
- success: List[str] = []
+ async with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) as p:
+ await p.wait()
+ success: List[str] = []
- t1 = spawn(w1)
- # Wait until w1 has received a connection
- await e1.wait()
+ t1 = spawn(w1)
+ # Wait until w1 has received a connection
+ await e1.wait()
- t2 = spawn(w2)
- # Wait until w2 is in the queue
- await ensure_waiting(p)
- await p.close()
+ t2 = spawn(w2)
+ # Wait until w2 is in the queue
+ await ensure_waiting(p)
# Wait for the workers to finish
e2.set()
async def test_open_no_op(pool_cls, dsn):
- p = pool_cls(dsn, open=True)
+ p = pool_cls(dsn, open=False)
+ await p.open()
try:
assert not p.closed
await p.open()
async def test_reopen(pool_cls, dsn):
- p = pool_cls(dsn, open=True)
+ p = pool_cls(dsn, open=False)
+ await p.open()
async with p.connection() as conn:
await conn.execute("select 1")
await p.close()
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
try:
- async with pool_cls(dsn, min_size=min_size(pool_cls, 4), open=True) as p:
+ async with pool_cls(dsn, min_size=min_size(pool_cls, 4)) as p:
await p.wait(timeout=2)
finally:
logger.removeHandler(handler)