def close(self) -> None:
self.pgconn.finish()
+ @property
+ def closed(self) -> bool:
+ return self.pgconn.status == pq.ConnStatus.BAD
+
def cursor(
self, name: Optional[str] = None, binary: bool = False
) -> cursor.BaseCursor:
self.loaders: LoadersMap = {}
self._reset()
self.arraysize = 1
+ self._closed = False
+
+ def __del__(self):
+ self.close()
+
+ def close(self) -> None:
+ self._closed = True
+
+ @property
+ def closed(self) -> bool:
+ return self._closed
def _reset(self) -> None:
from .adapt import Transformer
def _execute_send(
self, query: Query, vars: Optional[Params]
) -> "QueryGen":
- # Implement part of execute() before waiting common to sync and async
+ """
+ Implement part of execute() before waiting common to sync and async
+ """
+ if self.closed:
+ raise e.OperationalError("the cursor is closed")
+
+ if self.conn.closed:
+ raise e.OperationalError("the connection is closed")
+
if self.conn.pgconn.status != ConnStatus.OK:
- if self.conn.pgconn.status == ConnStatus.BAD:
- raise e.InterfaceError(
- "cannot execute operations: the connection is closed"
- )
- else:
- raise e.InterfaceError(
- f"cannot execute operations: the connection is"
- f" in status {self.conn.pgconn.status}"
- )
+ raise e.InterfaceError(
+ f"cannot execute operations: the connection is"
+ f" in status {self.conn.pgconn.status}"
+ )
self._reset()
return self.conn._exec_gen(self.conn.pgconn)
def _execute_results(self, results: List[PGresult]) -> None:
- # Implement part of execute() after waiting common to sync and async
+ """
+ Implement part of execute() after waiting common to sync and async
+ """
if not results:
raise e.InternalError("got no result from the query")
loop.run_until_complete(AsyncConnection.connect("dbname=nosuchdb"))
+def test_close(pq, aconn):
+ assert not aconn.closed
+ aconn.close()
+ assert aconn.closed
+ aconn.close()
+ assert aconn.closed
+
+
def test_commit(loop, pq, aconn):
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")
res = aconn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) == b"1"
+ aconn.close()
+ with pytest.raises(psycopg3.OperationalError):
+ loop.run_until_complete(aconn.commit())
+
def test_rollback(loop, pq, aconn):
aconn.pgconn.exec_(b"drop table if exists foo")
res = aconn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) is None
+ aconn.close()
+ with pytest.raises(psycopg3.OperationalError):
+ loop.run_until_complete(aconn.rollback())
+
def test_get_encoding(aconn, loop):
cur = aconn.cursor()
-def test_execute_many(aconn, loop):
+import pytest
+import psycopg3
+
+
+def test_close(aconn, loop):
+ cur = aconn.cursor()
+ assert not cur.closed
+ cur.close()
+ assert cur.closed
+
+ with pytest.raises(psycopg3.OperationalError):
+ loop.run_until_complete(cur.execute("select 'foo'"))
+
+ cur.close()
+ assert cur.closed
+
+
+def test_execute_many_results(aconn, loop):
cur = aconn.cursor()
rv = loop.run_until_complete(cur.execute("select 'foo'; select 'bar'"))
assert rv is cur
Connection.connect("dbname=nosuchdb")
+def test_close(pq, conn):
+ assert not conn.closed
+ conn.close()
+ assert conn.closed
+ conn.close()
+ assert conn.closed
+
+
def test_commit(pq, conn):
conn.pgconn.exec_(b"drop table if exists foo")
conn.pgconn.exec_(b"create table foo (id int primary key)")
res = conn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) == b"1"
+ conn.close()
+ with pytest.raises(psycopg3.OperationalError):
+ conn.commit()
+
def test_rollback(pq, conn):
conn.pgconn.exec_(b"drop table if exists foo")
res = conn.pgconn.exec_(b"select id from foo where id = 1")
assert res.get_value(0, 0) is None
+ conn.close()
+ with pytest.raises(psycopg3.OperationalError):
+ conn.rollback()
+
def test_get_encoding(conn):
(enc,) = conn.cursor().execute("show client_encoding").fetchone()
import pytest
+import psycopg3
-def test_execute_many(conn):
+def test_close(conn):
+ cur = conn.cursor()
+ assert not cur.closed
+ cur.close()
+ assert cur.closed
+
+ with pytest.raises(psycopg3.OperationalError):
+ cur.execute("select 'foo'")
+
+ cur.close()
+ assert cur.closed
+
+
+def test_execute_many_results(conn):
cur = conn.cursor()
rv = cur.execute("select 'foo'; select 'bar'")
assert rv is cur