# Run the task. Make sure don't die in the attempt.
try:
await task.run()
- except Exception as e:
+ except Exception as ex:
logger.warning(
- "task run %s failed: %s: %s", task, e.__class__.__name__, e
+ "task run %s failed: %s: %s",
+ task,
+ ex.__class__.__name__,
+ ex,
)
async def wait(self, timeout: float = 30.0) -> None:
else:
await self._add_to_pool(conn)
- async def configure(self, conn: AsyncConnection) -> None:
- """Configure a connection after creation."""
- if self._configure:
- await self._configure(conn)
-
def reconnect_failed(self) -> None:
"""
Called when reconnection failed for longer than `reconnect_timeout`.
async def _connect(self) -> AsyncConnection:
"""Return a new connection configured for the pool."""
conn = await AsyncConnection.connect(self.conninfo, **self.kwargs)
- await self.configure(conn)
conn._pool = self
+
+ if self._configure:
+ await self._configure(conn)
+ status = conn.pgconn.transaction_status
+ if status != TransactionStatus.IDLE:
+ nstatus = TransactionStatus(status).name
+ raise e.ProgrammingError(
+ f"connection left in status {nstatus} by configure function"
+ f" {self._configure}: discarded"
+ )
+
# Set an expiry date, with some randomness to avoid mass reconnection
conn._expire_at = monotonic() + self._jitter(
self.max_lifetime, -0.05, 0.0
try:
conn = await self._connect()
- except Exception as e:
- logger.warning(f"error connecting in {self.name!r}: {e}")
+ except Exception as ex:
+ logger.warning(f"error connecting in {self.name!r}: {ex}")
if attempt.time_to_give_up(now):
logger.warning(
"reconnection attempt in pool %r failed after %s sec",
logger.warning("rolling back returned connection: %s", conn)
try:
await conn.rollback()
- except Exception as e:
+ except Exception as ex:
logger.warning(
"rollback failed: %s: %s. Discarding connection %s",
- e.__class__.__name__,
- e,
+ ex.__class__.__name__,
+ ex,
conn,
)
await conn.close()
from contextlib import contextmanager
from collections import deque
+from .. import errors as e
from ..pq import TransactionStatus
from ..connection import Connection
configure: Optional[Callable[[Connection], None]] = None,
**kwargs: Any,
):
- self._configure: Callable[[Connection], None]
- self._configure = configure or (lambda conn: None)
+ self._configure = configure
self._lock = threading.RLock()
self._waiting: Deque["WaitingClient"] = deque()
else:
self._add_to_pool(conn)
- def configure(self, conn: Connection) -> None:
- """Configure a connection after creation."""
- self._configure(conn)
-
def reconnect_failed(self) -> None:
"""
Called when reconnection failed for longer than `reconnect_timeout`.
# Run the task. Make sure don't die in the attempt.
try:
task.run()
- except Exception as e:
+ except Exception as ex:
logger.warning(
- "task run %s failed: %s: %s", task, e.__class__.__name__, e
+ "task run %s failed: %s: %s",
+ task,
+ ex.__class__.__name__,
+ ex,
)
def _connect(self) -> Connection:
"""Return a new connection configured for the pool."""
conn = Connection.connect(self.conninfo, **self.kwargs)
- self.configure(conn)
conn._pool = self
+
+ if self._configure:
+ self._configure(conn)
+ status = conn.pgconn.transaction_status
+ if status != TransactionStatus.IDLE:
+ nstatus = TransactionStatus(status).name
+ raise e.ProgrammingError(
+ f"connection left in status {nstatus} by configure function"
+ f" {self._configure}: discarded"
+ )
+
# Set an expiry date, with some randomness to avoid mass reconnection
conn._expire_at = monotonic() + self._jitter(
self.max_lifetime, -0.05, 0.0
try:
conn = self._connect()
- except Exception as e:
- logger.warning(f"error connecting in {self.name!r}: {e}")
+ except Exception as ex:
+ logger.warning(f"error connecting in {self.name!r}: {ex}")
if attempt.time_to_give_up(now):
logger.warning(
"reconnection attempt in pool %r failed after %s sec",
"""
status = conn.pgconn.transaction_status
if status == TransactionStatus.IDLE:
- return
+ pass
- if status in (TransactionStatus.INTRANS, TransactionStatus.INERROR):
+ elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR):
# Connection returned with an active transaction
logger.warning("rolling back returned connection: %s", conn)
try:
conn.rollback()
- except Exception as e:
+ except Exception as ex:
logger.warning(
"rollback failed: %s: %s. Discarding connection %s",
- e.__class__.__name__,
- e,
+ ex.__class__.__name__,
+ ex,
conn,
)
conn.close()
def configure(conn):
nonlocal inits
inits += 1
- conn.execute("set default_transaction_read_only to on")
+ with conn.transaction():
+ conn.execute("set default_transaction_read_only to on")
with pool.ConnectionPool(minconn=1, configure=configure) as p:
+ p.wait(timeout=1.0)
with p.connection() as conn:
assert inits == 1
res = conn.execute("show default_transaction_read_only")
assert res.fetchone()[0] == "on"
+@pytest.mark.slow
+def test_configure_badstate(dsn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg3.pool")
+
+ def configure(conn):
+ conn.execute("select 1")
+
+ with pool.ConnectionPool(minconn=1, configure=configure) as p:
+ with pytest.raises(pool.PoolTimeout):
+ p.wait(timeout=0.5)
+
+ assert caplog.records
+ assert "INTRANS" in caplog.records[0].message
+
+
@pytest.mark.slow
def test_configure_broken(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg3.pool")
def configure(conn):
- conn.execute("WAT")
+ with conn.transaction():
+ conn.execute("WAT")
with pool.ConnectionPool(minconn=1, configure=configure) as p:
with pytest.raises(pool.PoolTimeout):
async def configure(conn):
nonlocal inits
inits += 1
- await conn.execute("set default_transaction_read_only to on")
+ async with conn.transaction():
+ await conn.execute("set default_transaction_read_only to on")
async with pool.AsyncConnectionPool(minconn=1, configure=configure) as p:
+ await p.wait(timeout=1.0)
async with p.connection() as conn:
assert inits == 1
res = await conn.execute("show default_transaction_read_only")
assert (await res.fetchone())[0] == "on"
+@pytest.mark.slow
+async def test_configure_badstate(dsn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg3.pool")
+
+ async def configure(conn):
+ await conn.execute("select 1")
+
+ async with pool.AsyncConnectionPool(minconn=1, configure=configure) as p:
+ with pytest.raises(pool.PoolTimeout):
+ await p.wait(timeout=0.5)
+
+ assert caplog.records
+ assert "INTRANS" in caplog.records[0].message
+
+
@pytest.mark.slow
async def test_configure_broken(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg3.pool")
async def configure(conn):
- await conn.execute("WAT")
+ async with conn.transaction():
+ await conn.execute("WAT")
async with pool.AsyncConnectionPool(minconn=1, configure=configure) as p:
with pytest.raises(pool.PoolTimeout):