.. autoattribute:: closed
:annotation: bool
+ .. autoattribute:: broken
+ :annotation: bool
+
.. method:: cursor(*, binary: bool = False, row_factory: Optional[RowFactory] = None) -> Cursor
.. method:: cursor(name: str, *, binary: bool = False, row_factory: Optional[RowFactory] = None) -> ServerCursor
# only a begin/commit and not a savepoint.
self._savepoints: List[str] = []
+ self._closed = False # closed by an explicit close()
self._prepared: PrepareManager = PrepareManager()
wself = ref(self)
-
pgconn.notice_handler = partial(BaseConnection._notice_handler, wself)
pgconn.notify_handler = partial(BaseConnection._notify_handler, wself)
@property
def closed(self) -> bool:
- """`True` if the connection is closed."""
+ """`!True` if the connection is closed."""
return self.pgconn.status == ConnStatus.BAD
+ @property
+ def broken(self) -> bool:
+ """
+ `!True` if the connection was interrupted.
+
+ A broken connection is always `closed`, but wasn't closed in a clean
+ way, such as using `close()` or a ``with`` block.
+ """
+ return self.pgconn.status == ConnStatus.BAD and not self._closed
+
@property
def autocommit(self) -> bool:
"""The autocommit state of the connection."""
def close(self) -> None:
"""Close the database connection."""
+ if self.closed:
+ return
+ self._closed = True
self.pgconn.finish()
@overload
await self.close()
async def close(self) -> None:
+ if self.closed:
+ return
+ self._closed = True
self.pgconn.finish()
@overload
def test_close(conn):
assert not conn.closed
+ assert not conn.broken
conn.close()
assert conn.closed
+ assert not conn.broken
assert conn.pgconn.status == conn.ConnStatus.BAD
cur = conn.cursor()
cur.execute("select 1")
+def test_broken(conn):
+ with pytest.raises(psycopg3.OperationalError):
+ conn.execute(
+ "select pg_terminate_backend(%s)", [conn.pgconn.backend_pid]
+ )
+ assert conn.closed
+ assert conn.broken
+ conn.close()
+ assert conn.closed
+ assert conn.broken
+
+
def test_connection_warn_close(dsn, recwarn):
conn = Connection.connect(dsn)
conn.close()
cur.execute("create table textctx ()")
assert conn.closed
+ assert not conn.broken
with psycopg3.connect(dsn) as conn:
with conn.cursor() as cur:
1 / 0
assert conn.closed
+ assert not conn.broken
with psycopg3.connect(dsn) as conn:
with conn.cursor() as cur:
async def test_close(aconn):
assert not aconn.closed
+ assert not aconn.broken
await aconn.close()
assert aconn.closed
+ assert not aconn.broken
assert aconn.pgconn.status == aconn.ConnStatus.BAD
cur = aconn.cursor()
await cur.execute("select 1")
+async def test_broken(aconn):
+ with pytest.raises(psycopg3.OperationalError):
+ await aconn.execute(
+ "select pg_terminate_backend(%s)", [aconn.pgconn.backend_pid]
+ )
+ assert aconn.closed
+ assert aconn.broken
+ await aconn.close()
+ assert aconn.closed
+ assert aconn.broken
+
+
async def test_connection_warn_close(dsn, recwarn):
conn = await AsyncConnection.connect(dsn)
await conn.close()
await cur.execute("create table textctx ()")
assert aconn.closed
+ assert not aconn.broken
async with await psycopg3.AsyncConnection.connect(dsn) as aconn:
async with aconn.cursor() as cur:
1 / 0
assert aconn.closed
+ assert not aconn.broken
async with await psycopg3.AsyncConnection.connect(dsn) as aconn:
async with aconn.cursor() as cur: