Close #172.
Psycopg 3.0.6
^^^^^^^^^^^^^
+- Allow to use `Cursor.description` if the connection is closed
+ (:ticket:`#172`).
- Don't raise exceptions on `ServerCursor.close()` if the connection is closed
(:ticket:`#173`).
- Fail on `Connection.cursor()` if the connection is closed (:ticket:`#174`).
from operator import attrgetter
from . import errors as e
-from ._encodings import pgconn_encoding
if TYPE_CHECKING:
from .cursor import BaseCursor
if not fname:
raise e.InterfaceError(f"no name available for column {index}")
- self._name = fname.decode(pgconn_encoding(cursor._pgconn))
+ self._name = fname.decode(cursor._encoding)
self._data = ColumnData(
ftype=res.ftype(index),
__slots__ = """
_conn format _adapters arraysize _closed _results pgresult _pos
_iresult _rowcount _query _tx _last_query _row_factory _make_row
- _pgconn
+ _pgconn _encoding
__weakref__
""".split()
self._iresult = 0
self._rowcount = -1
self._query: Optional[PostgresQuery]
+ self._encoding = "utf-8"
if reset_query:
self._query = None
self._send_prepare(name, pgq)
(result,) = yield from execute(self._pgconn)
if result.status == ExecStatus.FATAL_ERROR:
- raise e.error_from_result(
- result, encoding=pgconn_encoding(self._pgconn)
- )
+ raise e.error_from_result(result, encoding=self._encoding)
self._send_query_prepared(name, pgq, binary=binary)
# run the query
raise e.InterfaceError("the cursor is closed")
self._reset()
+ self._encoding = pgconn_encoding(self._pgconn)
if not self._last_query or (self._last_query is not query):
self._last_query = None
self._tx = adapt.Transformer(self)
statuses = {res.status for res in results}
badstats = statuses.difference(self._status_ok)
if results[-1].status == ExecStatus.FATAL_ERROR:
- raise e.error_from_result(
- results[-1], encoding=pgconn_encoding(self._pgconn)
- )
+ raise e.error_from_result(results[-1], encoding=self._encoding)
elif statuses.intersection(self._status_copy):
raise e.ProgrammingError(
"COPY cannot be used with this method; use copy() insead"
if status in (ExecStatus.COPY_IN, ExecStatus.COPY_OUT):
return
elif status == ExecStatus.FATAL_ERROR:
- raise e.error_from_result(
- result, encoding=pgconn_encoding(self._pgconn)
- )
+ raise e.error_from_result(result, encoding=self._encoding)
else:
raise e.ProgrammingError(
"copy() should be used only with COPY ... TO STDOUT or COPY ..."
from .rows import Row, RowFactory, AsyncRowFactory
from .cursor import BaseCursor, Cursor, execute
from .cursor_async import AsyncCursor
-from ._encodings import pgconn_encoding
if TYPE_CHECKING:
from .connection import Connection
) -> PQGen[None]:
"""Generator implementing `ServerCursor.execute()`."""
- conn = cur._conn
- query = self._make_declare_statement(conn, query)
+ query = self._make_declare_statement(cur, query)
# If the cursor is being reused, the previous one must be closed.
if self.described:
yield from cur._start_query(query)
pgq = cur._convert_query(query, params)
cur._execute_send(pgq, no_pqexec=True)
- results = yield from execute(conn.pgconn)
+ results = yield from execute(cur._conn.pgconn)
if results[-1].status != pq.ExecStatus.COMMAND_OK:
cur._raise_from_results(results)
self, cur: BaseCursor[ConnectionType, Row]
) -> PQGen[None]:
conn = cur._conn
- conn.pgconn.send_describe_portal(
- self.name.encode(pgconn_encoding(conn.pgconn))
- )
+ conn.pgconn.send_describe_portal(self.name.encode(cur._encoding))
results = yield from execute(conn.pgconn)
cur._execute_results(results, format=self._format)
self.described = True
yield from cur._conn._exec_command(query)
def _make_declare_statement(
- self, conn: ConnectionType, query: Query
+ self, cur: BaseCursor[ConnectionType, Row], query: Query
) -> sql.Composable:
if isinstance(query, bytes):
- query = query.decode(pgconn_encoding(conn.pgconn))
+ query = query.decode(cur._encoding)
if not isinstance(query, sql.Composable):
query = sql.SQL(query)
assert cur.description == []
assert cur.fetchall() == [()]
+ def test_description_closed_connection(self, conn):
+ # If we have reasons to break this test we will (e.g. we really need
+ # the connection). In #172 it fails just by accident.
+ cur = conn.execute("select 1::int4 as foo")
+ conn.close()
+ assert len(cur.description) == 1
+ col = cur.description[0]
+ assert col.name == "foo"
+ assert col.type_code == 23
+
def test_str(conn):
cur = conn.cursor()