exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
+ if self.closed:
+ return
+
if exc_type:
# try to rollback, but if there are problems (connection in a bad
# state) just warn without clobbering the exception bubbling up.
- if not self.closed:
- try:
- self.rollback()
- except Exception as exc2:
- warnings.warn(
- f"error rolling back the transaction on {self}: {exc2}",
- RuntimeWarning,
- )
+ try:
+ self.rollback()
+ except Exception as exc2:
+ warnings.warn(
+ f"error rolling back the transaction on {self}: {exc2}",
+ RuntimeWarning,
+ )
else:
self.commit()
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
+ if self.closed:
+ return
+
if exc_type:
# try to rollback, but if there are problems (connection in a bad
# state) just warn without clobbering the exception bubbling up.
- if not self.closed:
- try:
- await self.rollback()
- except Exception as exc2:
- warnings.warn(
- f"error rolling back the transaction on {self}: {exc2}",
- RuntimeWarning,
- )
+ try:
+ await self.rollback()
+ except Exception as exc2:
+ warnings.warn(
+ f"error rolling back the transaction on {self}: {exc2}",
+ RuntimeWarning,
+ )
else:
await self.commit()
def test_broken_reconnect(dsn):
with pool.ConnectionPool(dsn, minconn=1) as p:
- with pytest.raises(psycopg3.OperationalError):
- with p.connection() as conn:
- with conn.execute("select pg_backend_pid()") as cur:
- (pid1,) = cur.fetchone()
- conn.close()
+ with p.connection() as conn:
+ with conn.execute("select pg_backend_pid()") as cur:
+ (pid1,) = cur.fetchone()
+ conn.close()
with p.connection() as conn2:
with conn2.execute("select pg_backend_pid()") as cur:
async def test_broken_reconnect(dsn):
async with pool.AsyncConnectionPool(dsn, minconn=1) as p:
- with pytest.raises(psycopg3.OperationalError):
- async with p.connection() as conn:
- cur = await conn.execute("select pg_backend_pid()")
- (pid1,) = await cur.fetchone()
- await conn.close()
+ async with p.connection() as conn:
+ cur = await conn.execute("select pg_backend_pid()")
+ (pid1,) = await cur.fetchone()
+ await conn.close()
async with p.connection() as conn2:
cur = await conn2.execute("select pg_backend_pid()")
cur.execute("select * from textctx")
+def test_context_close(conn):
+ with conn:
+ conn.execute("select 1")
+ conn.close()
+
+
def test_context_rollback_no_clobber(conn, dsn, recwarn):
with pytest.raises(ZeroDivisionError):
with psycopg3.connect(dsn) as conn2:
await cur.execute("select * from textctx")
+async def test_context_close(aconn):
+ async with aconn:
+ await aconn.execute("select 1")
+ await aconn.close()
+
+
async def test_context_rollback_no_clobber(conn, dsn, recwarn):
with pytest.raises(ZeroDivisionError):
async with await psycopg3.AsyncConnection.connect(dsn) as conn2: