(pgres,) = self.wait(execute(self.pgconn))
if pgres.status != ExecStatus.COMMAND_OK:
raise e.OperationalError(
- f"error on begin: {pq.error_message(pgres)}"
+ "error on begin:"
+ f" {pq.error_message(pgres, encoding=self.codec.name)}"
)
def commit(self) -> None:
if pgres.status != ExecStatus.COMMAND_OK:
raise e.OperationalError(
f"error on {command.decode('utf8')}:"
- f" {pq.error_message(pgres)}"
+ f" {pq.error_message(pgres, encoding=self.codec.name)}"
)
@classmethod
gen = execute(self.pgconn)
(result,) = self.wait(gen)
if result.status != ExecStatus.TUPLES_OK:
- raise e.error_from_result(result)
+ raise e.error_from_result(result, encoding=self.codec.name)
def notifies(self) -> Generator[Optional[Notify], bool, None]:
decode = self.codec.decode
(pgres,) = await self.wait(execute(self.pgconn))
if pgres.status != ExecStatus.COMMAND_OK:
raise e.OperationalError(
- f"error on begin: {pq.error_message(pgres)}"
+ "error on begin:"
+ f" {pq.error_message(pgres, encoding=self.codec.name)}"
)
async def commit(self) -> None:
if pgres.status != ExecStatus.COMMAND_OK:
raise e.OperationalError(
f"error on {command.decode('utf8')}:"
- f" {pq.error_message(pgres)}"
+ f" {pq.error_message(pgres, encoding=self.codec.name)}"
)
@classmethod
gen = execute(self.pgconn)
(result,) = await self.wait(gen)
if result.status != ExecStatus.TUPLES_OK:
- raise e.error_from_result(result)
+ raise e.error_from_result(result, encoding=self.codec.name)
async def notifies(self) -> AsyncGenerator[Optional[Notify], bool]:
decode = self.codec.decode
state = result.error_field(DiagnosticField.SQLSTATE) or b""
cls = _class_for_state(state.decode("ascii"))
- return cls(pq.error_message(result), pgresult=result, encoding=encoding)
+ return cls(
+ pq.error_message(result, encoding=encoding),
+ pgresult=result,
+ encoding=encoding,
+ )
def _class_for_state(sqlstate: str) -> Type[Error]:
from typing import cast, NamedTuple, Optional, Union
from ..errors import OperationalError
-from .enums import DiagnosticField
+from .enums import DiagnosticField, ConnStatus
from .proto import PGconn, PGresult
+from .encodings import py_codecs
class PQerror(OperationalError):
atttypmod: int
-def error_message(obj: Union[PGconn, PGresult]) -> str:
+def error_message(obj: Union[PGconn, PGresult], encoding: str = "utf8") -> str:
"""
Return an error message from a PGconn or PGresult.
- The return value is a str (unlike pq data which is usually bytes).
+ The return value is a str (unlike pq data which is usually bytes): use
+ the connection encoding if available, otherwise the *encoding* parameter
+ as a fallback for decoding. Don't raise exception on decode errors.
+
"""
bmsg: bytes
if hasattr(obj, "error_field"):
+ # obj is a PGresult
obj = cast(PGresult, obj)
bmsg = obj.error_field(DiagnosticField.MESSAGE_PRIMARY) or b""
elif hasattr(obj, "error_message"):
# obj is a PGconn
+ obj = cast(PGconn, obj)
+ if obj.status == ConnStatus.OK:
+ encoding = py_codecs.get(
+ obj.parameter_status(b"client_encoding"), "utf8"
+ )
bmsg = obj.error_message
# strip severity and whitespaces
)
if bmsg:
- msg = bmsg.decode(
- "utf8", "replace"
- ) # TODO: or in connection encoding?
+ msg = bmsg.decode(encoding, "replace")
else:
msg = "no details available"
assert "NULL" in pq.error_message(pgconn)
+def test_error_message_encoding(pgconn):
+ res = pgconn.exec_(b"set client_encoding to latin9")
+ assert res.status == pq.ExecStatus.COMMAND_OK
+
+ res = pgconn.exec_('select 1 from "foo\u20acbar"'.encode("latin9"))
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+
+ msg = pq.error_message(pgconn)
+ assert "foo\u20acbar" in msg
+
+ msg = pq.error_message(res)
+ assert "foo\ufffdbar" in msg
+
+ msg = pq.error_message(res, encoding="latin9")
+ assert "foo\u20acbar" in msg
+
+ msg = pq.error_message(res, encoding="ascii")
+ assert "foo\ufffdbar" in msg
+
+
def test_make_empty_result(pgconn):
pgconn.exec_(b"wat")
res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR)