The method is available on PGconn, PGconnCancel, PGresult.
The generic function error_message() is retained only because it was
documented so we don't want to drop it. Internally only the
get_error_message() method is used.
from ._compat import LiteralString, Self, TypeAlias, TypeVar
from .pq.misc import connection_summary
from ._pipeline import BasePipeline
-from ._encodings import pgconn_encoding
from ._preparing import PrepareManager
from ._connection_info import ConnectionInfo
if not (self and self._notice_handlers):
return
- diag = e.Diagnostic(res, pgconn_encoding(self.pgconn))
+ diag = e.Diagnostic(res, self.pgconn._encoding)
for cb in self._notice_handlers:
try:
cb(diag)
if not (self and self._notify_handlers):
return
- enc = pgconn_encoding(self.pgconn)
+ enc = self.pgconn._encoding
n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
for cb in self._notify_handlers:
cb(n)
self._check_connection_ok()
if isinstance(command, str):
- command = command.encode(pgconn_encoding(self.pgconn))
+ command = command.encode(self.pgconn._encoding)
elif isinstance(command, Composable):
command = command.as_bytes(self)
result = (yield from generators.execute(self.pgconn))[-1]
if result.status != COMMAND_OK and result.status != TUPLES_OK:
if result.status == FATAL_ERROR:
- raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
+ raise e.error_from_result(result, encoding=self.pgconn._encoding)
else:
raise e.InterfaceError(
f"unexpected result {pq.ExecStatus(result.status).name}"
result = (yield from generators.execute(self.pgconn))[-1]
if result.status != COMMAND_OK:
if result.status == FATAL_ERROR:
- raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
+ raise e.error_from_result(result, encoding=self.pgconn._encoding)
else:
raise e.InterfaceError(
f"unexpected result {pq.ExecStatus(result.status).name}"
from . import pq
from ._tz import get_tzinfo
-from ._encodings import pgconn_encoding
from .conninfo import make_conninfo
@property
def encoding(self) -> str:
"""The Python codec name of the connection's client encoding."""
- return pgconn_encoding(self.pgconn)
+ return self.pgconn._encoding
def _get_pgconn_attr(self, name: str) -> str:
value: bytes = getattr(self.pgconn, name)
from ._compat import Self
from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE, PREFER_FLUSH
from .generators import copy_to, copy_end
-from ._encodings import pgconn_encoding
from ._acompat import spawn, gather, Queue, Worker
if TYPE_CHECKING:
bmsg: bytes | None
if exc:
msg = f"error from Python: {type(exc).__qualname__} - {exc}"
- bmsg = msg.encode(pgconn_encoding(self._pgconn), "replace")
+ bmsg = msg.encode(self._pgconn._encoding, "replace")
else:
bmsg = None
from ._compat import Self
from ._copy_base import BaseCopy, MAX_BUFFER_SIZE, QUEUE_SIZE, PREFER_FLUSH
from .generators import copy_to, copy_end
-from ._encodings import pgconn_encoding
from ._acompat import aspawn, agather, AQueue, AWorker
if TYPE_CHECKING:
bmsg: bytes | None
if exc:
msg = f"error from Python: {type(exc).__qualname__} - {exc}"
- bmsg = msg.encode(pgconn_encoding(self._pgconn), "replace")
+ bmsg = msg.encode(self._pgconn._encoding, "replace")
else:
bmsg = None
from .abc import Buffer, ConnectionType, PQGen, Transformer
from .pq.misc import connection_summary
from ._cmodule import _psycopg
-from ._encodings import pgconn_encoding
from .generators import copy_from
if TYPE_CHECKING:
if binary:
self.formatter = BinaryFormatter(tx)
else:
- self.formatter = TextFormatter(tx, encoding=pgconn_encoding(self._pgconn))
+ self.formatter = TextFormatter(tx, encoding=self._pgconn._encoding)
self._finished = False
from ._column import Column
from .pq.misc import connection_summary
from ._queries import PostgresQuery, PostgresClientQuery
-from ._encodings import pgconn_encoding
from ._preparing import Prepare
from .generators import execute, fetch, send
@property
def _encoding(self) -> str:
- return pgconn_encoding(self._pgconn)
+ return self._pgconn._encoding
from ._compat import cache
if TYPE_CHECKING:
- from .pq.abc import PGconn
from ._connection_base import BaseConnection
OK = ConnStatus.OK
Default to utf8 if the connection has no encoding info.
"""
- if conn:
- return pgconn_encoding(conn.pgconn)
- else:
- return "utf-8"
-
-
-def pgconn_encoding(pgconn: PGconn) -> str:
- """
- Return the Python encoding name of a libpq connection.
-
- Default to utf8 if the connection has no encoding info.
- """
- if pgconn.status == OK:
- pgenc = pgconn.parameter_status(b"client_encoding") or b"UTF8"
- return pg2pyenc(pgenc)
- else:
- return "utf-8"
+ return conn.pgconn._encoding if conn else "utf-8"
def conninfo_encoding(conninfo: str) -> str:
from .abc import PipelineCommand, PQGen
from ._compat import Deque, Self, TypeAlias
from .pq.misc import connection_summary
-from ._encodings import pgconn_encoding
from .generators import pipeline_communicate, fetch_many, send
from ._capabilities import capabilities
if queued is None:
(result,) = results
if result.status == FATAL_ERROR:
- raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
+ raise e.error_from_result(result, encoding=self.pgconn._encoding)
elif result.status == PIPELINE_ABORTED:
raise e.PipelineAborted("pipeline aborted")
else:
from .conninfo import make_conninfo, conninfo_to_dict
from .conninfo import conninfo_attempts, timeout_from_conninfo
from ._pipeline import Pipeline
-from ._encodings import pgconn_encoding
from .generators import notifies
from .transaction import Transaction
from .cursor import Cursor
with self.lock:
ns = self.wait(notifies(self.pgconn), interval=interval)
if ns:
- enc = pgconn_encoding(self.pgconn)
+ enc = self.pgconn._encoding
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
from .conninfo import make_conninfo, conninfo_to_dict
from .conninfo import conninfo_attempts_async, timeout_from_conninfo
from ._pipeline import AsyncPipeline
-from ._encodings import pgconn_encoding
from .generators import notifies
from .transaction import AsyncTransaction
from .cursor_async import AsyncCursor
async with self.lock:
ns = await self.wait(notifies(self.pgconn), interval=interval)
if ns:
- enc = pgconn_encoding(self.pgconn)
+ enc = self.pgconn._encoding
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
pipeline_status: int = PipelineStatus.OFF.value
error_message: bytes = b""
+ _encoding: str = "utf-8"
server_version: int = 0
backend_pid: int = 0
def reset(self) -> NoReturn:
self._raise()
+ def get_error_message(self, encoding: str = "") -> str:
+ return "the conenection is closed"
+
def reset_start(self) -> NoReturn:
self._raise()
def error_from_result(result: PGresult, encoding: str = "utf-8") -> Error:
- from psycopg import pq
-
state = result.error_field(DiagnosticField.SQLSTATE) or b""
- cls = _class_for_state(state.decode("ascii"))
- return cls(
- pq.error_message(result, encoding=encoding),
- info=result,
- encoding=encoding,
- )
+ cls = _class_for_state(state.decode("utf-8", "replace"))
+ return cls(result.get_error_message(encoding), info=result, encoding=encoding)
def _is_pgresult(info: ErrorInfo) -> TypeGuard[PGresult]:
from .waiting import Wait, Ready
from ._compat import Deque
from ._cmodule import _psycopg
-from ._encodings import pgconn_encoding, conninfo_encoding
+from ._encodings import conninfo_encoding
OK = pq.ConnStatus.OK
BAD = pq.ConnStatus.BAD
if conn.status == BAD:
encoding = conninfo_encoding(conninfo)
raise e.OperationalError(
- f"connection is bad: {pq.error_message(conn, encoding=encoding)}",
+ f"connection is bad: {conn.get_error_message(encoding)}",
pgconn=conn,
)
elif status == POLL_FAILED:
encoding = conninfo_encoding(conninfo)
raise e.OperationalError(
- f"connection failed: {pq.error_message(conn, encoding=encoding)}",
+ f"connection failed: {conn.get_error_message(encoding)}",
pgconn=e.finish_pgconn(conn),
)
else:
elif status == POLL_WRITING:
yield cancel_conn.socket, WAIT_W
elif status == POLL_FAILED:
- msg = cancel_conn.error_message.decode("utf8", "replace")
- raise e.OperationalError(f"cancellation failed: {msg}")
+ raise e.OperationalError(
+ f"cancellation failed: {cancel_conn.get_error_message()}"
+ )
else:
raise e.InternalError(f"unexpected poll status: {status}")
raise e.ProgrammingError("you cannot mix COPY with other operations")
result = results[0]
if result.status != COMMAND_OK:
- encoding = pgconn_encoding(pgconn)
- raise e.error_from_result(result, encoding=encoding)
+ raise e.error_from_result(result, encoding=pgconn._encoding)
return result
# Retrieve the final result of copy
(result,) = yield from _fetch_many(pgconn)
if result.status != COMMAND_OK:
- encoding = pgconn_encoding(pgconn)
- raise e.error_from_result(result, encoding=encoding)
+ raise e.error_from_result(result, encoding=pgconn._encoding)
return result
@property
def error_message(self) -> bytes: ...
+ def get_error_message(self, encoding: str = ...) -> str: ...
+
+ @property
+ def _encoding(self) -> str: ...
+
@property
def server_version(self) -> int: ...
@property
def error_message(self) -> bytes: ...
+ def get_error_message(self, encoding: str = ...) -> str: ...
+
def error_field(self, fieldcode: int) -> bytes | None: ...
@property
@property
def error_message(self) -> bytes: ...
+ def get_error_message(self, encoding: str = ...) -> str: ...
+
def reset(self) -> None: ...
def finish(self) -> None: ...
import sys
import logging
import ctypes.util
-from typing import cast, NamedTuple
+from typing import NamedTuple
-from .abc import PGconn, PGresult
+from . import abc
from ._enums import ConnStatus, TransactionStatus, PipelineStatus
from .._compat import cache
-from .._encodings import pgconn_encoding
logger = logging.getLogger("psycopg.pq")
return libname
-def error_message(obj: PGconn | PGresult, encoding: str = "utf8") -> str:
+def error_message(
+ obj: abc.PGconn | abc.PGresult | abc.PGcancelConn, encoding: str = ""
+) -> str:
"""
- Return an error message from a `PGconn` or `PGresult`.
+ Return an error message from a `PGconn`, `PGresult`, `PGcancelConn`.
The return value is a `!str` (unlike pq data which is usually `!bytes`):
use the connection encoding if available, otherwise the `!encoding`
parameter as a fallback for decoding. Don't raise exceptions on decoding
errors.
-
"""
- bmsg: bytes
-
- if hasattr(obj, "error_field"):
- # obj is a PGresult
- obj = cast(PGresult, obj)
- bmsg = obj.error_message
-
- elif hasattr(obj, "error_message"):
- # obj is a PGconn
- if obj.status == OK:
- encoding = pgconn_encoding(obj)
- bmsg = obj.error_message
-
- else:
- raise TypeError(f"PGconn or PGresult expected, got {type(obj).__name__}")
+ # Note: this function is exposed by the pq module and was documented, therefore
+ # we are not going to remove it, but we don't use it internally.
- if bmsg:
- msg = strip_severity(bmsg.decode(encoding, "replace"))
- else:
- msg = "no details available"
-
- return msg
+ # Don't pass the encoding if not specified, because different classes have
+ # different defaults (conn has its own encoding. others default to utf8).
+ return obj.get_error_message(encoding) if encoding else obj.get_error_message()
# Possible prefixes to strip for error messages, in the known localizations.
return msg.strip()
-def connection_summary(pgconn: PGconn) -> str:
+def _clean_error_message(msg: bytes, encoding: str) -> str:
+ smsg = msg.decode(encoding, "replace")
+ if smsg:
+ return strip_severity(smsg)
+ else:
+ return "no error details available"
+
+
+def connection_summary(pgconn: abc.PGconn) -> str:
"""
Return summary information on a connection.
from typing import cast as t_cast, TYPE_CHECKING
from .. import errors as e
+from .._encodings import pg2pyenc
from . import _pq_ctypes as impl
from .misc import PGnotify, ConninfoOption, PGresAttDesc
-from .misc import error_message, connection_summary
-from ._enums import Format, ExecStatus, Trace
+from .misc import connection_summary, _clean_error_message
+from ._enums import ConnStatus, ExecStatus, Format, Trace
# Imported locally to call them from __del__ methods
from ._pq_ctypes import PQclear, PQfinish, PQfreeCancel, PQcancelFinish, PQstatus
logger = logging.getLogger("psycopg")
+OK = ConnStatus.OK
+
def version() -> int:
"""Return the version number of the libpq currently loaded.
def error_message(self) -> bytes:
return impl.PQerrorMessage(self._pgconn_ptr)
+ def get_error_message(self, encoding: str = "") -> str:
+ return _clean_error_message(self.error_message, encoding or self._encoding)
+
+ @property
+ def _encoding(self) -> str:
+ if self.status == OK:
+ pgenc = self.parameter_status(b"client_encoding") or b"UTF8"
+ return pg2pyenc(pgenc)
+ else:
+ return "utf-8"
+
@property
def protocol_version(self) -> int:
return self._call_int(impl.PQprotocolVersion)
self._ensure_pgconn()
rv = impl.PQexec(self._pgconn_ptr, command)
if not rv:
- raise e.OperationalError(f"executing query failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"executing query failed: {self.get_error_message()}"
+ )
return PGresult(rv)
def send_query(self, command: bytes) -> None:
raise TypeError(f"bytes expected, got {type(command)} instead")
self._ensure_pgconn()
if not impl.PQsendQuery(self._pgconn_ptr, command):
- raise e.OperationalError(f"sending query failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"sending query failed: {self.get_error_message()}"
+ )
def exec_params(
self,
self._ensure_pgconn()
rv = impl.PQexecParams(*args)
if not rv:
- raise e.OperationalError(f"executing query failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"executing query failed: {self.get_error_message()}"
+ )
return PGresult(rv)
def send_query_params(
self._ensure_pgconn()
if not impl.PQsendQueryParams(*args):
raise e.OperationalError(
- f"sending query and params failed: {error_message(self)}"
+ f"sending query and params failed: {self.get_error_message()}"
)
def send_prepare(
self._ensure_pgconn()
if not impl.PQsendPrepare(self._pgconn_ptr, name, command, nparams, atypes):
raise e.OperationalError(
- f"sending query and params failed: {error_message(self)}"
+ f"sending query and params failed: {self.get_error_message()}"
)
def send_query_prepared(
self._ensure_pgconn()
if not impl.PQsendQueryPrepared(*args):
raise e.OperationalError(
- f"sending prepared query failed: {error_message(self)}"
+ f"sending prepared query failed: {self.get_error_message()}"
)
def _query_params_args(
self._ensure_pgconn()
rv = impl.PQprepare(self._pgconn_ptr, name, command, nparams, atypes)
if not rv:
- raise e.OperationalError(f"preparing query failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"preparing query failed: {self.get_error_message()}"
+ )
return PGresult(rv)
def exec_prepared(
)
if not rv:
raise e.OperationalError(
- f"executing prepared query failed: {error_message(self)}"
+ f"executing prepared query failed: {self.get_error_message()}"
)
return PGresult(rv)
self._ensure_pgconn()
rv = impl.PQdescribePrepared(self._pgconn_ptr, name)
if not rv:
- raise e.OperationalError(f"describe prepared failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"describe prepared failed: {self.get_error_message()}"
+ )
return PGresult(rv)
def send_describe_prepared(self, name: bytes) -> None:
self._ensure_pgconn()
if not impl.PQsendDescribePrepared(self._pgconn_ptr, name):
raise e.OperationalError(
- f"sending describe prepared failed: {error_message(self)}"
+ f"sending describe prepared failed: {self.get_error_message()}"
)
def describe_portal(self, name: bytes) -> PGresult:
self._ensure_pgconn()
rv = impl.PQdescribePortal(self._pgconn_ptr, name)
if not rv:
- raise e.OperationalError(f"describe portal failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"describe portal failed: {self.get_error_message()}"
+ )
return PGresult(rv)
def send_describe_portal(self, name: bytes) -> None:
self._ensure_pgconn()
if not impl.PQsendDescribePortal(self._pgconn_ptr, name):
raise e.OperationalError(
- f"sending describe portal failed: {error_message(self)}"
+ f"sending describe portal failed: {self.get_error_message()}"
)
def close_prepared(self, name: bytes) -> PGresult:
self._ensure_pgconn()
rv = impl.PQclosePrepared(self._pgconn_ptr, name)
if not rv:
- raise e.OperationalError(f"close prepared failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"close prepared failed: {self.get_error_message()}"
+ )
return PGresult(rv)
def send_close_prepared(self, name: bytes) -> None:
self._ensure_pgconn()
if not impl.PQsendClosePrepared(self._pgconn_ptr, name):
raise e.OperationalError(
- f"sending close prepared failed: {error_message(self)}"
+ f"sending close prepared failed: {self.get_error_message()}"
)
def close_portal(self, name: bytes) -> PGresult:
self._ensure_pgconn()
rv = impl.PQclosePortal(self._pgconn_ptr, name)
if not rv:
- raise e.OperationalError(f"close portal failed: {error_message(self)}")
+ raise e.OperationalError(f"close portal failed: {self.get_error_message()}")
return PGresult(rv)
def send_close_portal(self, name: bytes) -> None:
self._ensure_pgconn()
if not impl.PQsendClosePortal(self._pgconn_ptr, name):
raise e.OperationalError(
- f"sending close portal failed: {error_message(self)}"
+ f"sending close portal failed: {self.get_error_message()}"
)
def get_result(self) -> PGresult | None:
def consume_input(self) -> None:
if 1 != impl.PQconsumeInput(self._pgconn_ptr):
- raise e.OperationalError(f"consuming input failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"consuming input failed: {self.get_error_message()}"
+ )
def is_busy(self) -> int:
return impl.PQisBusy(self._pgconn_ptr)
def nonblocking(self, arg: int) -> None:
if 0 > impl.PQsetnonblocking(self._pgconn_ptr, arg):
raise e.OperationalError(
- f"setting nonblocking failed: {error_message(self)}"
+ f"setting nonblocking failed: {self.get_error_message()}"
)
def flush(self) -> int:
raise e.OperationalError("flushing failed: the connection is closed")
rv: int = impl.PQflush(self._pgconn_ptr)
if rv < 0:
- raise e.OperationalError(f"flushing failed: {error_message(self)}")
+ raise e.OperationalError(f"flushing failed: {self.get_error_message()}")
return rv
def set_single_row_mode(self) -> None:
buffer = bytes(buffer)
rv = impl.PQputCopyData(self._pgconn_ptr, buffer, len(buffer))
if rv < 0:
- raise e.OperationalError(f"sending copy data failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"sending copy data failed: {self.get_error_message()}"
+ )
return rv
def put_copy_end(self, error: bytes | None = None) -> int:
rv = impl.PQputCopyEnd(self._pgconn_ptr, error)
if rv < 0:
- raise e.OperationalError(f"sending copy end failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"sending copy end failed: {self.get_error_message()}"
+ )
return rv
def get_copy_data(self, async_: int) -> tuple[int, memoryview]:
nbytes = impl.PQgetCopyData(self._pgconn_ptr, byref(buffer_ptr), async_)
if nbytes == -2:
raise e.OperationalError(
- f"receiving copy data failed: {error_message(self)}"
+ f"receiving copy data failed: {self.get_error_message()}"
)
if buffer_ptr:
# TODO: do it without copy
out = impl.PQencryptPasswordConn(self._pgconn_ptr, passwd, user, algorithm)
if not out:
raise e.OperationalError(
- f"password encryption failed: {error_message(self)}"
+ f"password encryption failed: {self.get_error_message()}"
)
rv = string_at(out)
res = impl.PQchangePassword(self._pgconn_ptr, user, passwd)
if impl.PQresultStatus(res) != ExecStatus.COMMAND_OK:
raise e.OperationalError(
- f"failed to change password change command: {error_message(self)}"
+ f"failed to change password change command: {self.get_error_message()}"
)
def make_empty_result(self, exec_status: int) -> PGresult:
mode.
"""
if impl.PQexitPipelineMode(self._pgconn_ptr) != 1:
- raise e.OperationalError(error_message(self))
+ raise e.OperationalError(self.get_error_message())
def pipeline_sync(self) -> None:
"""Mark a synchronization point in a pipeline.
:raises ~e.OperationalError: if the flush request failed.
"""
if impl.PQsendFlushRequest(self._pgconn_ptr) == 0:
- raise e.OperationalError(f"flush request failed: {error_message(self)}")
+ raise e.OperationalError(
+ f"flush request failed: {self.get_error_message()}"
+ )
def _call_bytes(self, func: Callable[[impl.PGconn_struct], bytes | None]) -> bytes:
"""
def error_message(self) -> bytes:
return impl.PQresultErrorMessage(self._pgresult_ptr)
+ def get_error_message(self, encoding: str = "utf-8") -> str:
+ return _clean_error_message(self.error_message, encoding)
+
def error_field(self, fieldcode: int) -> bytes | None:
return impl.PQresultErrorField(self._pgresult_ptr, fieldcode)
See :pq:`PQcancelStart` for details.
"""
if not impl.PQcancelStart(self.pgcancelconn_ptr):
- msg = self.error_message.decode("utf8", "replace")
- raise e.OperationalError(f"couldn't start cancellation: {msg}")
+ raise e.OperationalError(
+ f"couldn't start cancellation: {self.get_error_message()}"
+ )
def blocking(self) -> None:
"""Requests that the server abandons processing of the current command
See :pq:`PQcancelBlocking` for details.
"""
if not impl.PQcancelBlocking(self.pgcancelconn_ptr):
- msg = self.error_message.decode("utf8", "replace")
- raise e.OperationalError(f"couldn't start cancellation: {msg}")
+ raise e.OperationalError(
+ f"couldn't start cancellation: {self.get_error_message()}"
+ )
def poll(self) -> int:
self._ensure_pgcancelconn()
def error_message(self) -> bytes:
return impl.PQcancelErrorMessage(self.pgcancelconn_ptr)
+ def get_error_message(self, encoding: str = "utf-8") -> str:
+ return _clean_error_message(self.error_message, encoding)
+
def reset(self) -> None:
self._ensure_pgcancelconn()
impl.PQcancelReset(self.pgcancelconn_ptr)
out = impl.PQescapeLiteral(self.conn._pgconn_ptr, data, len(data))
if not out:
raise e.OperationalError(
- f"escape_literal failed: {error_message(self.conn)} bytes"
+ f"escape_literal failed: {self.conn.get_error_message()} bytes"
)
rv = string_at(out)
impl.PQfreemem(out)
out = impl.PQescapeIdentifier(self.conn._pgconn_ptr, data, len(data))
if not out:
raise e.OperationalError(
- f"escape_identifier failed: {error_message(self.conn)} bytes"
+ f"escape_identifier failed: {self.conn.get_error_message()} bytes"
)
rv = string_at(out)
impl.PQfreemem(out)
if error:
raise e.OperationalError(
- f"escape_string failed: {error_message(self.conn)} bytes"
+ f"escape_string failed: {self.conn.get_error_message()} bytes"
)
else:
from psycopg_c.pq cimport _buffer_as_string_and_size, Escaping
from psycopg import errors as e
-from psycopg.pq.misc import error_message
@cython.freelist(8)
from time import monotonic
from psycopg import errors as e
-from psycopg.pq import abc, error_message
+from psycopg.pq import abc
from psycopg.abc import PipelineCommand, PQGen
from psycopg._enums import Wait, Ready
from psycopg._compat import Deque
if conn_status == libpq.CONNECTION_BAD:
encoding = conninfo_encoding(conninfo)
raise e.OperationalError(
- f"connection is bad: {error_message(conn, encoding=encoding)}",
+ f"connection is bad: {conn.get_error_message(encoding)}",
pgconn=conn
)
elif poll_status == libpq.PGRES_POLLING_FAILED:
encoding = conninfo_encoding(conninfo)
raise e.OperationalError(
- f"connection failed: {error_message(conn, encoding=encoding)}",
+ f"connection failed: {conn.get_error_message(encoding)}",
pgconn=e.finish_pgconn(conn),
)
else:
elif status == libpq.PGRES_POLLING_WRITING:
yield libpq.PQcancelSocket(pgcancelconn_ptr), WAIT_W
elif status == libpq.PGRES_POLLING_FAILED:
- msg = cancel_conn.error_message.decode("utf8", "replace")
- raise e.OperationalError(f"cancellation failed: {msg}")
+ raise e.OperationalError(
+ f"cancellation failed: {cancel_conn.get_error_message()}"
+ )
else:
raise e.InternalError(f"unexpected poll status: {status}")
cires = libpq.PQconsumeInput(pgconn_ptr)
if 1 != cires:
raise e.OperationalError(
- f"consuming input failed: {error_message(pgconn)}")
+ f"consuming input failed: {pgconn.get_error_message()}")
def fetch_many(pq.PGconn pgconn) -> PQGen[list[PGresult]]:
if 1 != cires:
raise e.OperationalError(
- f"consuming input failed: {error_message(pgconn)}")
+ f"consuming input failed: {pgconn.get_error_message()}")
if not ibres:
break
while True:
cires = libpq.PQconsumeInput(pgconn_ptr)
if 1 != cires:
raise e.OperationalError(
- f"consuming input failed: {error_message(pgconn)}")
+ f"consuming input failed: {pgconn.get_error_message()}")
_consume_notifies(pgconn)
from psycopg import errors as e
from psycopg.pq import Format
-from psycopg.pq.misc import error_message
logger = logging.getLogger("psycopg")
out = libpq.PQescapeLiteral(self.conn._pgconn_ptr, ptr, length)
if out is NULL:
raise e.OperationalError(
- f"escape_literal failed: {error_message(self.conn)}"
+ f"escape_literal failed: {self.conn.get_error_message())}"
)
rv = out[:strlen(out)]
out = libpq.PQescapeIdentifier(self.conn._pgconn_ptr, ptr, length)
if out is NULL:
raise e.OperationalError(
- f"escape_identifier failed: {error_message(self.conn)}"
+ f"escape_identifier failed: {self.conn.get_error_message()}"
)
rv = out[:strlen(out)]
if error:
PyMem_Free(buf_out)
raise e.OperationalError(
- f"escape_string failed: {error_message(self.conn)}"
+ f"escape_string failed: {self.conn.get_error_message()}"
)
else:
"""
if not libpq.PQcancelStart(self.pgcancelconn_ptr):
raise e.OperationalError(
- f"couldn't send cancellation: {self.error_message}"
+ f"couldn't send cancellation: {self.get_error_message()}"
)
def blocking(self) -> None:
"""
if not libpq.PQcancelBlocking(self.pgcancelconn_ptr):
raise e.OperationalError(
- f"couldn't send cancellation: {self.error_message}"
+ f"couldn't send cancellation: {self.get_error_message()}"
)
def poll(self) -> int:
def error_message(self) -> bytes:
return libpq.PQcancelErrorMessage(self.pgcancelconn_ptr)
+ def get_error_message(self, encoding: str = "utf-8") -> str:
+ return _clean_error_message(self.error_message, encoding)
+
def reset(self) -> None:
self._ensure_pgcancelconn()
libpq.PQcancelReset(self.pgcancelconn_ptr)
import sys
+from psycopg._encodings import pg2pyenc
from psycopg.pq import Format as PqFormat, Trace, version_pretty
-from psycopg.pq.misc import PGnotify, connection_summary
+from psycopg.pq.misc import PGnotify, connection_summary, _clean_error_message
from psycopg.pq._enums import ExecStatus
from psycopg_c.pq cimport PQBuffer
def error_message(self) -> bytes:
return libpq.PQerrorMessage(self._pgconn_ptr)
+ def get_error_message(self, encoding: str = "") -> str:
+ return _clean_error_message(self.error_message, encoding or self._encoding)
+
+ @property
+ def _encoding(self) -> str:
+ cdef const char *pgenc
+ if libpq.PQstatus(self._pgconn_ptr) == libpq.CONNECTION_OK:
+ pgenc = libpq.PQparameterStatus(self._pgconn_ptr, b"client_encoding")
+ if pgenc is NULL:
+ pgenc = b"UTF8"
+ return pg2pyenc(pgenc)
+ else:
+ return "utf-8"
+
@property
def protocol_version(self) -> int:
return _call_int(self, libpq.PQprotocolVersion)
with nogil:
pgresult = libpq.PQexec(self._pgconn_ptr, command)
if pgresult is NULL:
- raise e.OperationalError(f"executing query failed: {error_message(self)}")
+ raise e.OperationalError(f"executing query failed: {self.get_error_message()}")
return PGresult._from_ptr(pgresult)
with nogil:
rv = libpq.PQsendQuery(self._pgconn_ptr, command)
if not rv:
- raise e.OperationalError(f"sending query failed: {error_message(self)}")
+ raise e.OperationalError(f"sending query failed: {self.get_error_message()}")
def exec_params(
self,
<const char *const *>cvalues, clengths, cformats, result_format)
_clear_query_params(ctypes, cvalues, clengths, cformats)
if pgresult is NULL:
- raise e.OperationalError(f"executing query failed: {error_message(self)}")
+ raise e.OperationalError(f"executing query failed: {self.get_error_message()}")
return PGresult._from_ptr(pgresult)
def send_query_params(
_clear_query_params(ctypes, cvalues, clengths, cformats)
if not rv:
raise e.OperationalError(
- f"sending query and params failed: {error_message(self)}"
+ f"sending query and params failed: {self.get_error_message()}"
)
def send_prepare(
PyMem_Free(atypes)
if not rv:
raise e.OperationalError(
- f"sending query and params failed: {error_message(self)}"
+ f"sending query and params failed: {self.get_error_message()}"
)
def send_query_prepared(
_clear_query_params(ctypes, cvalues, clengths, cformats)
if not rv:
raise e.OperationalError(
- f"sending prepared query failed: {error_message(self)}"
+ f"sending prepared query failed: {self.get_error_message()}"
)
def prepare(
self._pgconn_ptr, name, command, <int>nparams, atypes)
PyMem_Free(atypes)
if rv is NULL:
- raise e.OperationalError(f"preparing query failed: {error_message(self)}")
+ raise e.OperationalError(f"preparing query failed: {self.get_error_message()}")
return PGresult._from_ptr(rv)
def exec_prepared(
_clear_query_params(ctypes, cvalues, clengths, cformats)
if rv is NULL:
raise e.OperationalError(
- f"executing prepared query failed: {error_message(self)}"
+ f"executing prepared query failed: {self.get_error_message()}"
)
return PGresult._from_ptr(rv)
cdef libpq.PGresult *rv = libpq.PQdescribePrepared(self._pgconn_ptr, name)
if rv is NULL:
raise e.OperationalError(
- f"describe prepared failed: {error_message(self)}"
+ f"describe prepared failed: {self.get_error_message()}"
)
return PGresult._from_ptr(rv)
cdef int rv = libpq.PQsendDescribePrepared(self._pgconn_ptr, name)
if not rv:
raise e.OperationalError(
- f"sending describe prepared failed: {error_message(self)}"
+ f"sending describe prepared failed: {self.get_error_message()}"
)
def describe_portal(self, const char *name) -> PGresult:
cdef libpq.PGresult *rv = libpq.PQdescribePortal(self._pgconn_ptr, name)
if rv is NULL:
raise e.OperationalError(
- f"describe prepared failed: {error_message(self)}"
+ f"describe prepared failed: {self.get_error_message()}"
)
return PGresult._from_ptr(rv)
cdef int rv = libpq.PQsendDescribePortal(self._pgconn_ptr, name)
if not rv:
raise e.OperationalError(
- f"sending describe prepared failed: {error_message(self)}"
+ f"sending describe prepared failed: {self.get_error_message()}"
)
def close_prepared(self, const char *name) -> PGresult:
cdef libpq.PGresult *rv = libpq.PQclosePrepared(self._pgconn_ptr, name)
if rv is NULL:
raise e.OperationalError(
- f"close prepared failed: {error_message(self)}"
+ f"close prepared failed: {self.get_error_message()}"
)
return PGresult._from_ptr(rv)
cdef int rv = libpq.PQsendClosePrepared(self._pgconn_ptr, name)
if not rv:
raise e.OperationalError(
- f"sending close prepared failed: {error_message(self)}"
+ f"sending close prepared failed: {self.get_error_message()}"
)
def close_portal(self, const char *name) -> PGresult:
cdef libpq.PGresult *rv = libpq.PQclosePortal(self._pgconn_ptr, name)
if rv is NULL:
raise e.OperationalError(
- f"close prepared failed: {error_message(self)}"
+ f"close prepared failed: {self.get_error_message()}"
)
return PGresult._from_ptr(rv)
cdef int rv = libpq.PQsendClosePortal(self._pgconn_ptr, name)
if not rv:
raise e.OperationalError(
- f"sending close prepared failed: {error_message(self)}"
+ f"sending close prepared failed: {self.get_error_message()}"
)
def get_result(self) -> "PGresult" | None:
def consume_input(self) -> None:
if 1 != libpq.PQconsumeInput(self._pgconn_ptr):
- raise e.OperationalError(f"consuming input failed: {error_message(self)}")
+ raise e.OperationalError(f"consuming input failed: {self.get_error_message()}")
def is_busy(self) -> int:
cdef int rv
@nonblocking.setter
def nonblocking(self, int arg) -> None:
if 0 > libpq.PQsetnonblocking(self._pgconn_ptr, arg):
- raise e.OperationalError(f"setting nonblocking failed: {error_message(self)}")
+ raise e.OperationalError(f"setting nonblocking failed: {self.get_error_message()}")
cpdef int flush(self) except -1:
if self._pgconn_ptr == NULL:
raise e.OperationalError(f"flushing failed: the connection is closed")
cdef int rv = libpq.PQflush(self._pgconn_ptr)
if rv < 0:
- raise e.OperationalError(f"flushing failed: {error_message(self)}")
+ raise e.OperationalError(f"flushing failed: {self.get_error_message()}")
return rv
def set_single_row_mode(self) -> None:
_buffer_as_string_and_size(buffer, &cbuffer, &length)
rv = libpq.PQputCopyData(self._pgconn_ptr, cbuffer, <int>length)
if rv < 0:
- raise e.OperationalError(f"sending copy data failed: {error_message(self)}")
+ raise e.OperationalError(f"sending copy data failed: {self.get_error_message()}")
return rv
def put_copy_end(self, error: bytes | None = None) -> int:
cerr = PyBytes_AsString(error)
rv = libpq.PQputCopyEnd(self._pgconn_ptr, cerr)
if rv < 0:
- raise e.OperationalError(f"sending copy end failed: {error_message(self)}")
+ raise e.OperationalError(f"sending copy end failed: {self.get_error_message()}")
return rv
def get_copy_data(self, int async_) -> tuple[int, memoryview]:
cdef int nbytes
nbytes = libpq.PQgetCopyData(self._pgconn_ptr, &buffer_ptr, async_)
if nbytes == -2:
- raise e.OperationalError(f"receiving copy data failed: {error_message(self)}")
+ raise e.OperationalError(f"receiving copy data failed: {self.get_error_message()}")
if buffer_ptr is not NULL:
data = PyMemoryView_FromObject(
PQBuffer._from_buffer(<unsigned char *>buffer_ptr, nbytes))
out = libpq.PQencryptPasswordConn(self._pgconn_ptr, passwd, user, calgo)
if not out:
raise e.OperationalError(
- f"password encryption failed: {error_message(self)}"
+ f"password encryption failed: {self.get_error_message()}"
)
rv = bytes(out)
res = libpq.PQchangePassword(self._pgconn_ptr, user, passwd)
if libpq.PQresultStatus(res) != ExecStatus.COMMAND_OK:
raise e.OperationalError(
- f"password encryption failed: {error_message(self)}"
+ f"password encryption failed: {self.get_error_message()}"
)
def make_empty_result(self, int exec_status) -> PGresult:
"""
_check_supported("PQexitPipelineMode", 140000)
if libpq.PQexitPipelineMode(self._pgconn_ptr) != 1:
- raise e.OperationalError(error_message(self))
+ raise e.OperationalError(self.get_error_message())
def pipeline_sync(self) -> None:
"""Mark a synchronization point in a pipeline.
_check_supported("PQsendFlushRequest ", 140000)
cdef int rv = libpq.PQsendFlushRequest(self._pgconn_ptr)
if rv == 0:
- raise e.OperationalError(f"flush request failed: {error_message(self)}")
+ raise e.OperationalError(f"flush request failed: {self.get_error_message()}")
cdef int _ensure_pgconn(PGconn pgconn) except 0:
def error_message(self) -> bytes:
return libpq.PQresultErrorMessage(self._pgresult_ptr)
+ def get_error_message(self, encoding: str = "utf-8") -> str:
+ return _clean_error_message(self.error_message, encoding)
+
def error_field(self, int fieldcode) -> bytes | None:
cdef char * rv = libpq.PQresultErrorField(self._pgresult_ptr, fieldcode)
if rv is not NULL:
conn = pq.PGconn.connect(dsn.encode())
if conn.status != pq.ConnStatus.OK:
- pytest.fail(f"bad connection: {conn.error_message.decode('utf8', 'replace')}")
+ pytest.fail(f"bad connection: {conn.get_error_message()}")
with maybe_trace(conn, tracefile, request.function):
yield conn
assert msg == pq.error_message(res)
primary = res.error_field(pq.DiagnosticField.MESSAGE_PRIMARY)
assert primary.decode("ascii") in msg
-
- with pytest.raises(TypeError):
- pq.error_message(None) # type: ignore[arg-type]
-
res.clear()
- assert pq.error_message(res) == "no details available"
+ assert pq.error_message(res) == "no error details available"
pgconn.finish()
assert "NULL" in pq.error_message(pgconn)
assert b"NULL" in pgconn.error_message # TODO: i10n?
+def test_get_error_message(pgconn):
+ assert pgconn.get_error_message() == "no error details available"
+ res = pgconn.exec_(b"wat")
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+ msg = pgconn.get_error_message()
+ assert "wat" in msg
+ pgconn.finish()
+ assert "NULL" in pgconn.get_error_message()
+
+
def test_backend_pid(pgconn):
assert isinstance(pgconn.backend_pid, int)
pgconn.finish()
monitor_conn = pq.PGconn.connect(dsn)
assert (
monitor_conn.status == pq.ConnStatus.OK
- ), f"bad connection: {monitor_conn.error_message.decode('utf8', 'replace')}"
+ ), f"bad connection: {monitor_conn.get_error_message()}"
pgconn.send_query_params(b"SELECT pg_sleep($1)", [b"180"])
cancel_conn.poll()
with pytest.raises(psycopg.OperationalError):
cancel_conn.reset()
- assert cancel_conn.error_message.strip() == "connection pointer is NULL"
+ assert cancel_conn.get_error_message() == "connection pointer is NULL"
def test_cancel(pgconn):
user, passwd = "ashesh", "psycopg2"
r = pgconn.exec_(f"CREATE USER {user} LOGIN PASSWORD '{passwd}'".encode())
if r.status != pq.ExecStatus.COMMAND_OK:
- pytest.skip(f"cannot create a PostgreSQL role: {r.error_message.decode()}")
+ pytest.skip(f"cannot create a PostgreSQL role: {r.get_error_message()}")
yield user.encode(), passwd.encode()
r = pgconn.exec_(f"DROP USER {user}".encode())
if r.status != pq.ExecStatus.COMMAND_OK:
- pytest.fail(f"failed to drop {user} role: {r.error_message.decode()}")
+ pytest.fail(f"failed to drop {user} role: {r.get_error_message()}")
@pytest.mark.libpq(">= 17")
@pytest.mark.crdb_skip("password_encryption")
def test_encrypt_password_query(pgconn):
res = pgconn.exec_(b"set password_encryption to 'md5'")
- assert res.status == pq.ExecStatus.COMMAND_OK, pgconn.error_message.decode()
+ assert res.status == pq.ExecStatus.COMMAND_OK, pgconn.get_error_message()
enc = pgconn.encrypt_password(b"psycopg2", b"ashesh")
assert enc == b"md594839d658c28a357126f105b9cb14cfc"
assert res.error_message == b""
+def test_get_error_message(pgconn):
+ res = pgconn.exec_(b"select 1")
+ assert res.get_error_message() == "no error details available"
+ res = pgconn.exec_(b"select wat")
+ assert "wat" in res.get_error_message()
+ res.clear()
+ assert res.get_error_message() == "no error details available"
+
+
def test_error_field(pgconn):
res = pgconn.exec_(b"select wat")
# https://github.com/cockroachdb/cockroach/issues/81794