.. autoattribute:: info
- .. autoattribute:: client_encoding
-
- The value returned is always normalized to the Python codec
- `~codecs.CodecInfo.name`::
-
- conn.client_encoding = 'latin9'
- conn.client_encoding
- 'iso8859-15'
-
- and it reflects the current connection property, even if it is set
- outside Python::
-
- conn.execute("SET client_encoding TO LATIN1")
- conn.client_encoding
- 'iso8859-1'
-
- A few PostgreSQL encodings are not available in Python and cannot be
- selected (currently ``EUC_TW``, ``MULE_INTERNAL``). The PostgreSQL
- ``SQL_ASCII`` encoding has the special meaning of "no encoding": see
- :ref:`adapt-string` for details.
-
- .. seealso::
-
- The `PostgreSQL supported encodings`__.
-
- .. __: https://www.postgresql.org/docs/current/multibyte.html
-
-
.. autoattribute:: prepare_threshold
See :ref:`prepared-statements` for details.
``standard_conforming_strings``... See :pq:`PQparameterStatus()` for
all the available parameters.
+ .. autoattribute:: encoding
+
+ The value returned is always normalized to the Python codec
+ `~codecs.CodecInfo.name`::
+
+ conn.execute("SET client_encoding TO LATIN9")
+ conn.info.encoding
+ 'iso8859-15'
+
+ A few PostgreSQL encodings are not available in Python and cannot be
+ selected (currently ``EUC_TW``, ``MULE_INTERNAL``). The PostgreSQL
+ ``SQL_ASCII`` encoding has the special meaning of "no encoding": see
+ :ref:`adapt-string` for details.
+
+ .. seealso::
+
+ The `PostgreSQL supported encodings`__.
+
+ .. __: https://www.postgresql.org/docs/current/multibyte.html
+
.. rubric:: Objects involved in :ref:`transactions`
'Crème Brûlée at 4.99€'
PostgreSQL databases `have an encoding`__, and `the session has an encoding`__
-too, exposed in the `Connection.client_encoding` attribute. If your database
-and connection are in UTF-8 encoding you will likely have no problem,
-otherwise you will have to make sure that your application only deals with the
-non-ASCII chars that the database can handle; failing to do so may result in
-encoding/decoding errors:
+too, exposed in the `!Connection.info.`\ `~ConnectionInfo.encoding`
+attribute. If your database and connection are in UTF-8 encoding you will
+likely have no problem, otherwise you will have to make sure that your
+application only deals with the non-ASCII chars that the database can handle;
+failing to do so may result in encoding/decoding errors:
.. __: https://www.postgresql.org/docs/current/sql-createdatabase.html
.. __: https://www.postgresql.org/docs/current/multibyte.html
.. code:: python
# The encoding is set at connection time according to the db configuration
- conn.client_encoding
+ conn.info.encoding
'utf-8'
# The Latin-9 encoding can manage some European accented letters
# and the Euro symbol
- conn.client_encoding = 'latin9'
+ conn.execute("SET client_encoding TO LATIN9")
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
'Crème Brûlée at 4.99€'
# The Latin-1 encoding doesn't have a representation for the Euro symbol
- conn.client_encoding = 'latin1'
+ conn.execute("SET client_encoding TO LATIN1")
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
# Traceback (most recent call last)
# ...
# in encoding "UTF8" has no equivalent in encoding "LATIN1"
In rare cases you may have strings with unexpected encodings in the database.
-Using the ``SQL_ASCII`` client encoding (or setting
-`~Connection.client_encoding` ``= "ascii"``) will disable decoding of the data
+Using the ``SQL_ASCII`` client encoding will disable decoding of the data
coming from the database, which will be returned as `bytes`:
.. code:: python
- conn.client_encoding = "ascii"
+ conn.execute("SET client_encoding TO SQL_ASCII")
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
b'Cr\xc3\xa8me Br\xc3\xbbl\xc3\xa9e at 4.99\xe2\x82\xac'
from operator import attrgetter
from . import errors as e
+from ._encodings import pgconn_encoding
if TYPE_CHECKING:
from .cursor import BaseCursor
if not fname:
raise e.InterfaceError(f"no name available for column {index}")
- self._name = fname.decode(cursor.connection.client_encoding)
+ self._name = fname.decode(pgconn_encoding(cursor._pgconn))
self._data = ColumnData(
ftype=res.ftype(index),
# Copyright (C) 2020-2021 The Psycopg Team
import codecs
-from typing import Dict, Union
+from typing import Dict, Union, TYPE_CHECKING
from .errors import NotSupportedError
+if TYPE_CHECKING:
+ from .pq.abc import PGconn
+
_py_codecs = {
"BIG5": "big5",
"EUC_CN": "gb2312",
pg_codecs = {v: k.encode() for k, v in _py_codecs.items()}
+def pgconn_encoding(pgconn: "PGconn") -> str:
+ pgenc = pgconn.parameter_status(b"client_encoding") or b"UTF8"
+ return pg2pyenc(pgenc)
+
+
def py2pgenc(name: str) -> bytes:
"""Convert a Python encoding name to PostgreSQL encoding name.
from .sql import Composable
from .abc import Buffer, Query, Params
from ._enums import PyFormat
+from ._encodings import pgconn_encoding
if TYPE_CHECKING:
from .abc import Transformer
conn = transformer.connection
if conn:
- self._encoding = conn.client_encoding
+ self._encoding = pgconn_encoding(conn.pgconn)
def convert(self, query: Query, vars: Optional[Params]) -> None:
"""
from ._cmodule import _psycopg
from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
from .generators import notifies
-from ._encodings import pg2pyenc
+from ._encodings import pgconn_encoding
from ._preparing import PrepareManager
from .transaction import Transaction
from .server_cursor import ServerCursor
f"{TransactionStatus(status).name}"
)
- @property
- def client_encoding(self) -> str:
- """The Python codec name of the connection's client encoding."""
- pgenc = self.pgconn.parameter_status(b"client_encoding") or b"UTF8"
- return pg2pyenc(pgenc)
-
@property
def info(self) -> ConnectionInfo:
"""A `ConnectionInfo` attribute to inspect connection properties."""
if not (self and self._notice_handler):
return
- diag = e.Diagnostic(res, self.client_encoding)
+ diag = e.Diagnostic(res, pgconn_encoding(self.pgconn))
for cb in self._notice_handlers:
try:
cb(diag)
if not (self and self._notify_handlers):
return
- enc = self.client_encoding
+ enc = pgconn_encoding(self.pgconn)
n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
for cb in self._notify_handlers:
cb(n)
)
if isinstance(command, str):
- command = command.encode(self.client_encoding)
+ command = command.encode(pgconn_encoding(self.pgconn))
elif isinstance(command, Composable):
command = command.as_bytes(self)
if result.status not in (ExecStatus.COMMAND_OK, ExecStatus.TUPLES_OK):
if result.status == ExecStatus.FATAL_ERROR:
raise e.error_from_result(
- result, encoding=self.client_encoding
+ result, encoding=pgconn_encoding(self.pgconn)
)
else:
raise e.InterfaceError(
while 1:
with self.lock:
ns = self.wait(notifies(self.pgconn))
- enc = self.client_encoding
+ enc = pgconn_encoding(self.pgconn)
for pgn in ns:
n = Notify(
pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid
from ._enums import IsolationLevel
from ._compat import asynccontextmanager
from .conninfo import make_conninfo, conninfo_to_dict
+from ._encodings import pgconn_encoding
from .connection import BaseConnection, CursorRow, Notify
from .generators import notifies
from .transaction import AsyncTransaction
while 1:
async with self.lock:
ns = await self.wait(notifies(self.pgconn))
- enc = self.client_encoding
+ enc = pgconn_encoding(self.pgconn)
for pgn in ns:
n = Notify(
pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid
from . import pq
from . import errors as e
from ._tz import get_tzinfo
-from ._encodings import pg2pyenc
+from ._encodings import pgconn_encoding
def make_conninfo(conninfo: str = "", **kwargs: Any) -> str:
`~Connection.connect()` or from environment variables. The password
is never returned (you can read it using the `password` attribute).
"""
- pyenc = self._pyenc
+ pyenc = self.encoding
# Get the known defaults to avoid reporting them
defaults = {
Return `None` is the parameter is unknown.
"""
- res = self.pgconn.parameter_status(param_name.encode(self._pyenc))
- return res.decode(self._pyenc) if res is not None else None
+ res = self.pgconn.parameter_status(param_name.encode(self.encoding))
+ return res.decode(self.encoding) if res is not None else None
@property
def server_version(self) -> int:
"""The Python timezone info of the connection's timezone."""
return get_tzinfo(self.pgconn)
+ @property
+ def encoding(self) -> str:
+ """The Python codec name of the connection's client encoding."""
+ return pgconn_encoding(self.pgconn)
+
def _get_pgconn_attr(self, name: str) -> str:
value: bytes = getattr(self.pgconn, name)
- return value.decode(self._pyenc)
-
- @property
- def _pyenc(self) -> str:
- pgenc = self.pgconn.parameter_status(b"client_encoding") or b"UTF8"
- return pg2pyenc(pgenc)
+ return value.decode(self.encoding)
from .adapt import PyFormat
from ._compat import create_task
from ._cmodule import _psycopg
+from ._encodings import pgconn_encoding
from .generators import copy_from, copy_to, copy_end
if TYPE_CHECKING:
if self._pgresult.binary_tuples == pq.Format.TEXT:
self.formatter = TextFormatter(
- tx, encoding=self.connection.client_encoding
+ tx, encoding=pgconn_encoding(self._pgconn)
)
else:
self.formatter = BinaryFormatter(tx)
bmsg: Optional[bytes]
if exc:
msg = f"error from Python: {type(exc).__qualname__} - {exc}"
- bmsg = msg.encode(self.connection.client_encoding, "replace")
+ bmsg = msg.encode(pgconn_encoding(self._pgconn), "replace")
else:
bmsg = None
from ._column import Column
from ._cmodule import _psycopg
from ._queries import PostgresQuery
+from ._encodings import pgconn_encoding
from ._preparing import Prepare
if TYPE_CHECKING:
_tx: "Transformer"
_make_row: RowMaker[Row]
+ _pgconn: "PGconn"
def __init__(self, connection: ConnectionType):
self._conn = connection
(result,) = yield from execute(self._pgconn)
if result.status == ExecStatus.FATAL_ERROR:
raise e.error_from_result(
- result, encoding=self._conn.client_encoding
+ result, encoding=pgconn_encoding(self._pgconn)
)
self._send_query_prepared(name, pgq, binary=binary)
badstats = statuses.difference(self._status_ok)
if results[-1].status == ExecStatus.FATAL_ERROR:
raise e.error_from_result(
- results[-1], encoding=self._conn.client_encoding
+ results[-1], encoding=pgconn_encoding(self._pgconn)
)
elif statuses.intersection(self._status_copy):
raise e.ProgrammingError(
return
elif status == ExecStatus.FATAL_ERROR:
raise e.error_from_result(
- result, encoding=self._conn.client_encoding
+ result, encoding=pgconn_encoding(self._pgconn)
)
else:
raise e.ProgrammingError(
from .rows import Row, RowFactory, AsyncRowFactory
from .cursor import AnyCursor, BaseCursor, Cursor, execute
from .cursor_async import AsyncCursor
+from ._encodings import pgconn_encoding
if TYPE_CHECKING:
from .connection import Connection
) -> PQGen[None]:
conn = cur._conn
conn.pgconn.send_describe_portal(
- self.name.encode(conn.client_encoding)
+ self.name.encode(pgconn_encoding(conn.pgconn))
)
results = yield from execute(conn.pgconn)
cur._execute_results(results, format=self.format)
) -> sql.Composable:
if isinstance(query, bytes):
- query = query.decode(cur._conn.client_encoding)
+ query = query.decode(pgconn_encoding(cur._conn.pgconn))
if not isinstance(query, sql.Composable):
query = sql.SQL(query)
from .pq import Escaping
from .abc import AdaptContext
from .adapt import Transformer, PyFormat
+from ._encodings import pgconn_encoding
def quote(obj: Any, context: Optional[AdaptContext] = None) -> str:
"""
conn = context.connection if context else None
- enc = conn.client_encoding if conn else "utf-8"
+ enc = pgconn_encoding(conn.pgconn) if conn else "utf-8"
b = self.as_bytes(context)
if isinstance(b, bytes):
return b.decode(enc)
if context:
conn = context.connection
if conn:
- enc = conn.client_encoding
+ enc = pgconn_encoding(conn.pgconn)
return self._obj.encode(enc)
def format(self, *args: Any, **kwargs: Any) -> Composed:
if not conn:
raise ValueError("a connection is necessary for Identifier")
esc = Escaping(conn.pgconn)
- enc = conn.client_encoding
+ enc = pgconn_encoding(conn.pgconn)
escs = [esc.escape_identifier(s.encode(enc)) for s in self._obj]
return b".".join(escs)
def as_bytes(self, context: Optional[AdaptContext]) -> bytes:
conn = context.connection if context else None
- enc = conn.client_encoding if conn else "utf-8"
+ enc = pgconn_encoding(conn.pgconn) if conn else "utf-8"
return self.as_string(context).encode(enc)
from ..abc import AdaptContext
from ..adapt import Buffer, Dumper, Loader
from ..errors import DataError
+from .._encodings import pgconn_encoding
if TYPE_CHECKING:
from ..pq.abc import Escaping as EscapingProto
conn = self.connection
if conn:
- enc = conn.client_encoding
+ enc = pgconn_encoding(conn.pgconn)
if enc != "ascii":
self._encoding = enc
super().__init__(oid, context)
conn = self.connection
if conn:
- enc = conn.client_encoding
+ enc = pgconn_encoding(conn.pgconn)
self._encoding = enc if enc != "ascii" else ""
def load(self, data: Buffer) -> Union[bytes, str]:
def _exec_command(command, *args, **kwargs):
cmdcopy = command
if isinstance(cmdcopy, bytes):
- cmdcopy = cmdcopy.decode(conn.client_encoding)
+ cmdcopy = cmdcopy.decode(conn.info.encoding)
elif isinstance(cmdcopy, sql.Composable):
cmdcopy = cmdcopy.as_string(conn)
from threading import Thread
import psycopg
-from psycopg._encodings import pg2pyenc
from psycopg import Connection, Notify
from psycopg.rows import tuple_row
from psycopg.errors import UndefinedTable
assert not conn.autocommit
-def test_get_encoding(conn):
- (enc,) = conn.cursor().execute("show client_encoding").fetchone()
- assert conn.client_encoding == pg2pyenc(enc)
-
-
-@pytest.mark.parametrize(
- "enc, out, codec",
- [
- ("utf8", "UTF8", "utf-8"),
- ("utf-8", "UTF8", "utf-8"),
- ("utf_8", "UTF8", "utf-8"),
- ("eucjp", "EUC_JP", "euc_jp"),
- ("euc-jp", "EUC_JP", "euc_jp"),
- ("latin9", "LATIN9", "iso8859-15"),
- ],
-)
-def test_normalize_encoding(conn, enc, out, codec):
- conn.execute("select set_config('client_encoding', %s, false)", [enc])
- assert conn.pgconn.parameter_status(b"client_encoding").decode() == out
- assert conn.client_encoding == codec
-
-
-@pytest.mark.parametrize(
- "enc, out, codec",
- [
- ("utf8", "UTF8", "utf-8"),
- ("utf-8", "UTF8", "utf-8"),
- ("utf_8", "UTF8", "utf-8"),
- ("eucjp", "EUC_JP", "euc_jp"),
- ("euc-jp", "EUC_JP", "euc_jp"),
- ],
-)
-def test_encoding_env_var(dsn, monkeypatch, enc, out, codec):
- monkeypatch.setenv("PGCLIENTENCODING", enc)
- conn = psycopg.connect(dsn)
- assert conn.pgconn.parameter_status(b"client_encoding").decode() == out
- assert conn.client_encoding == codec
-
-
-def test_set_encoding_unsupported(conn):
- cur = conn.cursor()
- cur.execute("set client_encoding to EUC_TW")
- with pytest.raises(psycopg.NotSupportedError):
- cur.execute("select 'x'")
-
-
@pytest.mark.parametrize(
"args, kwargs, want",
[
from psycopg.rows import tuple_row
from psycopg.errors import UndefinedTable
from psycopg.conninfo import conninfo_to_dict, make_conninfo
-from psycopg._encodings import pg2pyenc
from .utils import gc_collect
from .test_cursor import my_row_factory
assert not aconn.autocommit
-async def test_get_encoding(aconn):
- cur = aconn.cursor()
- await cur.execute("show client_encoding")
- (enc,) = await cur.fetchone()
- assert aconn.client_encoding == pg2pyenc(enc)
-
-
-@pytest.mark.parametrize(
- "enc, out, codec",
- [
- ("utf8", "UTF8", "utf-8"),
- ("utf-8", "UTF8", "utf-8"),
- ("utf_8", "UTF8", "utf-8"),
- ("eucjp", "EUC_JP", "euc_jp"),
- ("euc-jp", "EUC_JP", "euc_jp"),
- ("latin9", "LATIN9", "iso8859-15"),
- ],
-)
-async def test_normalize_encoding(aconn, enc, out, codec):
- await aconn.execute(
- "select set_config('client_encoding', %s, false)", [enc]
- )
- assert aconn.pgconn.parameter_status(b"client_encoding").decode() == out
- assert aconn.client_encoding == codec
-
-
-@pytest.mark.parametrize(
- "enc, out, codec",
- [
- ("utf8", "UTF8", "utf-8"),
- ("utf-8", "UTF8", "utf-8"),
- ("utf_8", "UTF8", "utf-8"),
- ("eucjp", "EUC_JP", "euc_jp"),
- ("euc-jp", "EUC_JP", "euc_jp"),
- ],
-)
-async def test_encoding_env_var(dsn, monkeypatch, enc, out, codec):
- monkeypatch.setenv("PGCLIENTENCODING", enc)
- aconn = await psycopg.AsyncConnection.connect(dsn)
- assert aconn.pgconn.parameter_status(b"client_encoding").decode() == out
- assert aconn.client_encoding == codec
-
-
-async def test_set_encoding_unsupported(aconn):
- cur = aconn.cursor()
- await cur.execute("set client_encoding to EUC_TW")
- with pytest.raises(psycopg.NotSupportedError):
- await cur.execute("select 'x'")
-
-
@pytest.mark.parametrize(
"args, kwargs, want",
[
import psycopg
from psycopg import ProgrammingError
from psycopg.conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
+from psycopg._encodings import pg2pyenc
snowman = "\u2603"
conn.info.timezone
assert len(caplog.records) == 2
assert "FOOBAAR0" in caplog.records[1].message
+
+ def test_encoding(self, conn):
+ enc = conn.execute("show client_encoding").fetchone()[0]
+ assert conn.info.encoding == pg2pyenc(enc)
+
+ @pytest.mark.parametrize(
+ "enc, out, codec",
+ [
+ ("utf8", "UTF8", "utf-8"),
+ ("utf-8", "UTF8", "utf-8"),
+ ("utf_8", "UTF8", "utf-8"),
+ ("eucjp", "EUC_JP", "euc_jp"),
+ ("euc-jp", "EUC_JP", "euc_jp"),
+ ("latin9", "LATIN9", "iso8859-15"),
+ ],
+ )
+ def test_normalize_encoding(self, conn, enc, out, codec):
+ conn.execute("select set_config('client_encoding', %s, false)", [enc])
+ assert conn.info.parameter_status("client_encoding") == out
+ assert conn.info.encoding == codec
+
+ @pytest.mark.parametrize(
+ "enc, out, codec",
+ [
+ ("utf8", "UTF8", "utf-8"),
+ ("utf-8", "UTF8", "utf-8"),
+ ("utf_8", "UTF8", "utf-8"),
+ ("eucjp", "EUC_JP", "euc_jp"),
+ ("euc-jp", "EUC_JP", "euc_jp"),
+ ],
+ )
+ def test_encoding_env_var(self, dsn, monkeypatch, enc, out, codec):
+ monkeypatch.setenv("PGCLIENTENCODING", enc)
+ conn = psycopg.connect(dsn)
+ assert conn.info.parameter_status("client_encoding") == out
+ assert conn.info.encoding == codec
+
+ def test_set_encoding_unsupported(self, conn):
+ cur = conn.cursor()
+ cur.execute("set client_encoding to EUC_TW")
+ with pytest.raises(psycopg.NotSupportedError):
+ cur.execute("select 'x'")