Only used to implement internal commands such as "commit", with eventual
arguments bound client-side. The cursor can do more complex stuff.
"""
- if self.pgconn.status != ConnStatus.OK:
- if self.pgconn.status == ConnStatus.BAD:
- raise e.OperationalError("the connection is closed")
- raise e.InterfaceError(
- f"cannot execute operations: the connection is"
- f" in status {self.pgconn.status}"
- )
+ self._check_connection_ok()
if isinstance(command, str):
command = command.encode(pgconn_encoding(self.pgconn))
)
return result
+ def _check_connection_ok(self) -> None:
+ if self.pgconn.status == ConnStatus.OK:
+ return
+
+ if self.pgconn.status == ConnStatus.BAD:
+ raise e.OperationalError("the connection is closed")
+ raise e.InterfaceError(
+ f"cannot execute operations: the connection is"
+ f" in status {self.pgconn.status}"
+ )
+
def _start_query(self) -> PQGen[None]:
"""Generator to start a transaction if necessary."""
if self._autocommit:
"""
Return a new cursor to send commands and queries to the connection.
"""
+ self._check_connection_ok()
+
if not row_factory:
row_factory = self.row_factory
binary: bool = False,
) -> Cursor[Row]:
"""Execute a query and return a cursor to read its results."""
- cur = self.cursor()
- if binary:
- cur.format = Format.BINARY
-
try:
+ cur = self.cursor()
+ if binary:
+ cur.format = Format.BINARY
+
return cur.execute(query, params, prepare=prepare)
+
except e.Error as ex:
raise ex.with_traceback(None)
"""
Return a new `AsyncCursor` to send commands and queries to the connection.
"""
+ self._check_connection_ok()
+
if not row_factory:
row_factory = self.row_factory
prepare: Optional[bool] = None,
binary: bool = False,
) -> AsyncCursor[Row]:
- cur = self.cursor()
- if binary:
- cur.format = Format.BINARY
-
try:
+ cur = self.cursor()
+ if binary:
+ cur.format = Format.BINARY
+
return await cur.execute(query, params, prepare=prepare)
+
except e.Error as ex:
raise ex.with_traceback(None)
def test_close(conn):
assert not conn.closed
assert not conn.broken
+
+ cur = conn.cursor()
+
conn.close()
assert conn.closed
assert not conn.broken
assert conn.pgconn.status == conn.ConnStatus.BAD
- cur = conn.cursor()
-
conn.close()
assert conn.closed
assert conn.pgconn.status == conn.ConnStatus.BAD
assert conn.broken
+def test_cursor_closed(conn):
+ conn.close()
+ with pytest.raises(psycopg.OperationalError):
+ with conn.cursor("foo"):
+ pass
+ with pytest.raises(psycopg.OperationalError):
+ conn.cursor()
+
+
def test_connection_warn_close(dsn, recwarn):
conn = Connection.connect(dsn)
conn.close()
async def test_close(aconn):
assert not aconn.closed
assert not aconn.broken
+
+ cur = aconn.cursor()
+
await aconn.close()
assert aconn.closed
assert not aconn.broken
assert aconn.pgconn.status == aconn.ConnStatus.BAD
- cur = aconn.cursor()
-
await aconn.close()
assert aconn.closed
assert aconn.pgconn.status == aconn.ConnStatus.BAD
assert aconn.broken
+async def test_cursor_closed(aconn):
+ await aconn.close()
+ with pytest.raises(psycopg.OperationalError):
+ async with aconn.cursor("foo"):
+ pass
+ aconn.cursor("foo")
+ with pytest.raises(psycopg.OperationalError):
+ aconn.cursor()
+
+
async def test_connection_warn_close(dsn, recwarn):
conn = await AsyncConnection.connect(dsn)
await conn.close()