.. automethod:: commit
.. automethod:: rollback
.. automethod:: close
+
+ .. rubric:: Checking the connection state
+
.. autoproperty:: closed
+ .. autoproperty:: client_encoding
+
+ The property is writable for sync connections, read-only for async
+ ones: you can call `~AsyncConnection.set_client_encoding()` on those.
.. rubric:: Methods you will need if you do something cool
.. automethod:: commit
.. automethod:: rollback
.. automethod:: notifies
+ .. automethod:: set_client_encoding
.. autoclass:: psycopg3.Notify
elif isinstance(context, BaseCursor):
self._connection = context.connection
- self._encoding = context.connection.pyenc
+ self._encoding = context.connection.client_encoding
self._dumpers = {}
self._dumpers_maps.extend(
(self._dumpers, context.dumpers, context.connection.dumpers)
elif isinstance(context, BaseConnection):
self._connection = context
- self._encoding = context.pyenc
+ self._encoding = context.client_encoding
self._dumpers = {}
self._dumpers_maps.extend((self._dumpers, context.dumpers))
self._loaders = {}
from functools import partial
from . import pq
-from . import errors as e
-from . import cursor
from . import proto
+from . import cursor
+from . import errors as e
+from . import encodings
from .pq import TransactionStatus, ExecStatus
-from .conninfo import make_conninfo
from .waiting import wait, wait_async
+from .conninfo import make_conninfo
from .generators import notifies
logger = logging.getLogger(__name__)
self.loaders: proto.LoadersMap = {}
self._notice_handlers: List[NoticeHandler] = []
self._notify_handlers: List[NotifyHandler] = []
- # postgres name of the client encoding (in bytes)
- self._pgenc = b""
- # python name of the client encoding
- self._pyenc = "utf-8"
wself = ref(self)
raise NotImplementedError
return self.cursor_factory(self, format=format)
- @property
- def pyenc(self) -> str:
- pgenc = self.pgconn.parameter_status(b"client_encoding") or b""
- if self._pgenc != pgenc:
- if pgenc:
- try:
- self._pyenc = pq.py_codecs[pgenc]
- except KeyError:
- raise e.NotSupportedError(
- f"encoding {pgenc.decode('ascii')} not available in Python"
- )
-
- self._pgenc = pgenc
- return self._pyenc
-
@property
def client_encoding(self) -> str:
- rv = self.pgconn.parameter_status(b"client_encoding")
- return rv.decode("utf-8") if rv else "UTF8"
+ """The Python codec name of the connection's client encoding."""
+ pgenc = self.pgconn.parameter_status(b"client_encoding") or b"UTF8"
+ return encodings.pg2py(pgenc)
@client_encoding.setter
- def client_encoding(self, value: str) -> None:
- self._set_client_encoding(value)
+ def client_encoding(self, name: str) -> None:
+ self._set_client_encoding(name)
- def _set_client_encoding(self, value: str) -> None:
+ def _set_client_encoding(self, name: str) -> None:
raise NotImplementedError
def cancel(self) -> None:
if not (self and self._notice_handler):
return
- diag = e.Diagnostic(res, self._pyenc)
+ diag = e.Diagnostic(res, self.client_encoding)
for cb in self._notice_handlers:
try:
cb(diag)
if not (self and self._notify_handlers):
return
- n = Notify(
- pgn.relname.decode(self._pyenc),
- pgn.extra.decode(self._pyenc),
- pgn.be_pid,
- )
+ enc = self.client_encoding
+ n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
for cb in self._notify_handlers:
cb(n)
if pgres.status != ExecStatus.COMMAND_OK:
raise e.OperationalError(
"error on begin:"
- f" {pq.error_message(pgres, encoding=self._pyenc)}"
+ f" {pq.error_message(pgres, encoding=self.client_encoding)}"
)
def commit(self) -> None:
if results[-1].status != ExecStatus.COMMAND_OK:
raise e.OperationalError(
f"error on {command.decode('utf8')}:"
- f" {pq.error_message(results[-1], encoding=self._pyenc)}"
+ f" {pq.error_message(results[-1], encoding=self.client_encoding)}"
)
@classmethod
) -> proto.RV:
return wait(gen, timeout=timeout)
- def _set_client_encoding(self, value: str) -> None:
+ def _set_client_encoding(self, name: str) -> None:
with self.lock:
self.pgconn.send_query_params(
b"select set_config('client_encoding', $1, false)",
- [value.encode("ascii")],
+ [encodings.py2pg(name)],
)
gen = execute(self.pgconn)
(result,) = self.wait(gen)
if result.status != ExecStatus.TUPLES_OK:
- raise e.error_from_result(result, encoding=self._pyenc)
+ raise e.error_from_result(
+ result, encoding=self.client_encoding
+ )
def notifies(self) -> Iterator[Notify]:
"""Generate a stream of `Notify`"""
while 1:
with self.lock:
ns = self.wait(notifies(self.pgconn))
+ enc = self.client_encoding
for pgn in ns:
n = Notify(
- pgn.relname.decode(self._pyenc),
- pgn.extra.decode(self._pyenc),
- pgn.be_pid,
+ pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid
)
yield n
if pgres.status != ExecStatus.COMMAND_OK:
raise e.OperationalError(
"error on begin:"
- f" {pq.error_message(pgres, encoding=self._pyenc)}"
+ f" {pq.error_message(pgres, encoding=self.client_encoding)}"
)
async def commit(self) -> None:
if pgres.status != ExecStatus.COMMAND_OK:
raise e.OperationalError(
f"error on {command.decode('utf8')}:"
- f" {pq.error_message(pgres, encoding=self._pyenc)}"
+ f" {pq.error_message(pgres, encoding=self.client_encoding)}"
)
@classmethod
async def wait(cls, gen: proto.PQGen[proto.RV]) -> proto.RV:
return await wait_async(gen)
- def _set_client_encoding(self, value: str) -> None:
+ def _set_client_encoding(self, name: str) -> None:
raise AttributeError(
"'client_encoding' is read-only on async connections:"
" please use await .set_client_encoding() instead."
)
- async def set_client_encoding(self, value: str) -> None:
+ async def set_client_encoding(self, name: str) -> None:
+ """Async version of the `client_encoding` setter."""
async with self.lock:
self.pgconn.send_query_params(
b"select set_config('client_encoding', $1, false)",
- [value.encode("ascii")],
+ [name.encode("utf-8")],
)
gen = execute(self.pgconn)
(result,) = await self.wait(gen)
if result.status != ExecStatus.TUPLES_OK:
- raise e.error_from_result(result, encoding=self._pyenc)
+ raise e.error_from_result(
+ result, encoding=self.client_encoding
+ )
async def notifies(self) -> AsyncIterator[Notify]:
while 1:
async with self.lock:
ns = await self.wait(notifies(self.pgconn))
+ enc = self.client_encoding
for pgn in ns:
n = Notify(
- pgn.relname.decode(self._pyenc),
- pgn.extra.decode(self._pyenc),
- pgn.be_pid,
+ pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid
)
yield n
raise TypeError(
"cannot copy str data in binary mode: use bytes instead"
)
- self._encoding = self.connection.pyenc
+ self._encoding = self.connection.client_encoding
return data.encode(self._encoding)
else:
def finish(self, error: str = "") -> None:
conn = self.connection
- berr = error.encode(conn.pyenc, "replace") if error else None
+ berr = error.encode(conn.client_encoding, "replace") if error else None
conn.wait(copy_end(conn.pgconn, berr))
self._finished = True
async def finish(self, error: str = "") -> None:
conn = self.connection
- berr = error.encode(conn.pyenc, "replace") if error else None
+ berr = error.encode(conn.client_encoding, "replace") if error else None
await conn.wait(copy_end(conn.pgconn, berr))
self._finished = True
res = self.pgresult
if not res or res.status != self.ExecStatus.TUPLES_OK:
return None
- encoding = self.connection.pyenc
+ encoding = self.connection.client_encoding
return [Column(res, i, encoding) for i in range(res.nfields)]
@property
if results[-1].status == S.FATAL_ERROR:
raise e.error_from_result(
- results[-1], encoding=self.connection.pyenc
+ results[-1], encoding=self.connection.client_encoding
)
elif badstats & {S.COPY_IN, S.COPY_OUT, S.COPY_BOTH}:
if status in (pq.ExecStatus.COPY_IN, pq.ExecStatus.COPY_OUT):
return
elif status == pq.ExecStatus.FATAL_ERROR:
- raise e.error_from_result(result, encoding=self.connection.pyenc)
+ raise e.error_from_result(
+ result, encoding=self.connection.client_encoding
+ )
else:
raise e.ProgrammingError(
"copy() should be used only with COPY ... TO STDOUT or COPY ..."
(result,) = self.connection.wait(gen)
if result.status == self.ExecStatus.FATAL_ERROR:
raise e.error_from_result(
- result, encoding=self.connection.pyenc
+ result, encoding=self.connection.client_encoding
)
else:
pgq.dump(vars)
(result,) = await self.connection.wait(gen)
if result.status == self.ExecStatus.FATAL_ERROR:
raise e.error_from_result(
- result, encoding=self.connection.pyenc
+ result, encoding=self.connection.client_encoding
)
else:
pgq.dump(vars)
# Copyright (C) 2020 The Psycopg Team
+import codecs
from typing import Dict, Union
+from .errors import NotSupportedError
+
_py_codecs = {
"BIG5": "big5",
"EUC_CN": "gb2312",
"WIN874": "cp874",
}
-py_codecs: Dict[Union[bytes, str, None], str] = {}
+py_codecs: Dict[Union[bytes, str], str] = {}
py_codecs.update((k, v) for k, v in _py_codecs.items())
-py_codecs.update((k.encode("ascii"), v) for k, v in _py_codecs.items())
+py_codecs.update((k.encode("utf-8"), v) for k, v in _py_codecs.items())
+
+pg_codecs = {v: k.encode("utf-8") for k, v in _py_codecs.items()}
+
+
+def py2pg(name: str) -> bytes:
+ """Convert a Python encoding name to PostgreSQL encoding name.
+
+ Raise LookupError if the Python encoding is unknown.
+ """
+ return pg_codecs[codecs.lookup(name).name]
+
+
+def pg2py(name: Union[bytes, str]) -> str:
+ """Convert a Python encoding name to PostgreSQL encoding name.
+
+ Raise NotSupportedError if the PostgreSQL encoding is not supported by
+ Python.
+ """
+ try:
+ return py_codecs[name]
+ except KeyError:
+ raise NotSupportedError("codec not available in Python: {name!r}")
from . import errors as e
from .proto import PQGen
from .waiting import Wait, Ready
+from .encodings import py_codecs
logger = logging.getLogger(__name__)
f"1 result expected from copy end, got {len(results)}"
)
if results[0].status != pq.ExecStatus.COMMAND_OK:
- encoding = pq.py_codecs.get(
- pgconn.parameter_status(b"client_encoding"), "utf8"
+ encoding = py_codecs.get(
+ pgconn.parameter_status(b"client_encoding") or "", "utf-8"
)
raise e.error_from_result(results[0], encoding=encoding)
f"1 result expected from copy end, got {len(results)}"
)
if results[0].status != pq.ExecStatus.COMMAND_OK:
- encoding = pq.py_codecs.get(
- pgconn.parameter_status(b"client_encoding"), "utf8"
+ encoding = py_codecs.get(
+ pgconn.parameter_status(b"client_encoding") or "", "utf-8"
)
raise e.error_from_result(results[0], encoding=encoding)
DiagnosticField,
Format,
)
-from .encodings import py_codecs
from .misc import ConninfoOption, PQerror, PGnotify, PGresAttDesc
from .misc import error_message
from . import proto
"PQerror",
"error_message",
"ConninfoOption",
- "py_codecs",
"version",
)
from ..errors import OperationalError
from .enums import DiagnosticField, ConnStatus
from .proto import PGconn, PGresult
-from .encodings import py_codecs
class PQerror(OperationalError):
bmsg = bmsg.splitlines()[0].split(b":", 1)[-1].strip()
elif hasattr(obj, "error_message"):
+ from psycopg3.encodings import py_codecs
+
# obj is a PGconn
obj = cast(PGconn, obj)
if obj.status == ConnStatus.OK:
encoding = py_codecs.get(
- obj.parameter_status(b"client_encoding"), "utf8"
+ obj.parameter_status(b"client_encoding") or "", "utf-8"
)
bmsg = obj.error_message
raise ValueError(f"no connection in the context: {context}")
esc = Escaping(conn.pgconn)
- enc = conn.pyenc
+ enc = conn.client_encoding
escs = [esc.escape_identifier(s.encode(enc)) for s in self._obj]
return b".".join(escs).decode(enc)
tx = context if isinstance(context, Transformer) else Transformer(conn)
dumper = tx.get_dumper(self._obj, Format.TEXT)
quoted = dumper.quote(self._obj)
- return quoted.decode(conn.pyenc if conn else "utf-8")
+ return quoted.decode(conn.client_encoding if conn else "utf-8")
class Placeholder(Composable):
def __init__(self, src: type, context: AdaptContext):
super().__init__(src, context)
+ self.encoding = "utf-8"
if self.connection:
- if self.connection.client_encoding != "SQL_ASCII":
- self.encoding = self.connection.pyenc
- else:
- self.encoding = "utf-8"
- else:
- self.encoding = "utf-8"
+ enc = self.connection.client_encoding
+ if enc != "ascii":
+ self.encoding = enc
@Dumper.binary(str)
super().__init__(oid, context)
if self.connection:
- if self.connection.client_encoding != "SQL_ASCII":
- self.encoding = self.connection.pyenc
+ enc = self.connection.client_encoding
+ if enc != "ascii":
+ self.encoding = enc
else:
self.encoding = ""
else:
class UnknownLoader(Loader):
def __init__(self, oid: int, context: AdaptContext):
super().__init__(oid, context)
- self.encoding = self.connection.pyenc if self.connection else "utf-8"
+ self.encoding = (
+ self.connection.client_encoding if self.connection else "utf-8"
+ )
def load(self, data: bytes) -> str:
return data.decode(self.encoding)
elif isinstance(context, BaseCursor):
self._connection = context.connection
- self._encoding = context.connection.pyenc
+ self._encoding = context.connection.client_encoding
self._dumpers = {}
self._dumpers_maps.extend(
(self._dumpers, context.dumpers, self.connection.dumpers)
elif isinstance(context, BaseConnection):
self._connection = context
- self._encoding = context.pyenc
+ self._encoding = context.client_encoding
self._dumpers = {}
self._dumpers_maps.extend((self._dumpers, context.dumpers))
self._loaders = {}
super().__init__(oid, context)
self.is_utf8 = 0
- self.encoding = NULL
+ self.encoding = "utf-8"
conn = self.connection
- if conn is not None:
- if conn.client_encoding == "UTF8":
+ if conn:
+ self._bytes_encoding = conn.client_encoding.encode("utf-8")
+ self.encoding = self._bytes_encoding
+ if self._bytes_encoding == b"utf-8":
self.is_utf8 = 1
- elif conn.client_encoding != "SQL_ASCII":
- self._bytes_encoding = conn.pyenc.encode("utf-8")
- self.encoding = self._bytes_encoding
- else:
- self.encoding = "utf-8"
+ elif self._bytes_encoding == b"ascii":
+ self.encoding = NULL
cdef object cload(self, const char *data, size_t length):
if self.is_utf8:
return PyUnicode_DecodeUTF8(<char *>data, length, NULL)
-
- if self.encoding:
+ elif self.encoding:
return PyUnicode_Decode(<char *>data, length, self.encoding, NULL)
else:
return data[:length]
import psycopg3
from psycopg3 import Connection, Notify
+from psycopg3 import encodings
from psycopg3.errors import UndefinedTable
from psycopg3.conninfo import conninfo_to_dict
def test_get_encoding(conn):
(enc,) = conn.cursor().execute("show client_encoding").fetchone()
- assert enc == conn.client_encoding
+ assert conn.client_encoding == encodings.pg2py(enc)
def test_set_encoding(conn):
- newenc = "LATIN1" if conn.client_encoding != "LATIN1" else "UTF8"
+ newenc = "iso8859-1" if conn.client_encoding != "iso8859-1" else "utf-8"
assert conn.client_encoding != newenc
conn.client_encoding = newenc
assert conn.client_encoding == newenc
(enc,) = conn.cursor().execute("show client_encoding").fetchone()
- assert enc == newenc
+ assert encodings.pg2py(enc) == newenc
@pytest.mark.parametrize(
)
def test_normalize_encoding(conn, enc, out, codec):
conn.client_encoding = enc
- assert conn.client_encoding == out
- assert conn.pyenc == codec
+ assert (
+ conn.pgconn.parameter_status(b"client_encoding").decode("utf-8") == out
+ )
+ assert conn.client_encoding == codec
@pytest.mark.parametrize(
def test_encoding_env_var(dsn, monkeypatch, enc, out, codec):
monkeypatch.setenv("PGCLIENTENCODING", enc)
conn = psycopg3.connect(dsn)
- assert conn.client_encoding == out
- assert conn.pyenc == codec
+ assert (
+ conn.pgconn.parameter_status(b"client_encoding").decode("utf-8") == out
+ )
+ assert conn.client_encoding == codec
def test_set_encoding_unsupported(conn):
- conn.client_encoding = "EUC_TW"
+ cur = conn.cursor()
+ cur.execute("set client_encoding to EUC_TW")
with pytest.raises(psycopg3.NotSupportedError):
- conn.cursor().execute("select 1")
+ cur.execute("select 'x'")
def test_set_encoding_bad(conn):
- with pytest.raises(psycopg3.DatabaseError):
+ with pytest.raises(LookupError):
conn.client_encoding = "WAT"
import weakref
import psycopg3
+from psycopg3 import encodings
from psycopg3 import AsyncConnection
from psycopg3.errors import UndefinedTable
from psycopg3.conninfo import conninfo_to_dict
cur = await aconn.cursor()
await cur.execute("show client_encoding")
(enc,) = await cur.fetchone()
- assert enc == aconn.client_encoding
+ assert aconn.client_encoding == encodings.pg2py(enc)
async def test_set_encoding(aconn):
- newenc = "LATIN1" if aconn.client_encoding != "LATIN1" else "UTF8"
+ newenc = "iso8859-1" if aconn.client_encoding != "iso8859-1" else "utf-8"
assert aconn.client_encoding != newenc
with pytest.raises(AttributeError):
aconn.client_encoding = newenc
cur = await aconn.cursor()
await cur.execute("show client_encoding")
(enc,) = await cur.fetchone()
- assert enc == newenc
+ assert encodings.pg2py(enc) == newenc
@pytest.mark.parametrize(
("utf_8", "UTF8", "utf-8"),
("eucjp", "EUC_JP", "euc_jp"),
("euc-jp", "EUC_JP", "euc_jp"),
+ ("latin9", "LATIN9", "iso8859-15"),
],
)
async def test_normalize_encoding(aconn, enc, out, codec):
await aconn.set_client_encoding(enc)
- assert aconn.client_encoding == out
- assert aconn.pyenc == codec
+ assert (
+ aconn.pgconn.parameter_status(b"client_encoding").decode("utf-8")
+ == out
+ )
+ assert aconn.client_encoding == codec
@pytest.mark.parametrize(
async def test_encoding_env_var(dsn, monkeypatch, enc, out, codec):
monkeypatch.setenv("PGCLIENTENCODING", enc)
aconn = await psycopg3.AsyncConnection.connect(dsn)
- assert aconn.client_encoding == out
- assert aconn.pyenc == codec
+ assert (
+ aconn.pgconn.parameter_status(b"client_encoding").decode("utf-8")
+ == out
+ )
+ assert aconn.client_encoding == codec
async def test_set_encoding_unsupported(aconn):
)
.format(sql.Identifier(procname), sql.Identifier(paramname))
.as_string(conn)
- .encode(conn.pyenc)
+ .encode(conn.client_encoding)
)
# execute regardless of sync/async conn
--- /dev/null
+import codecs
+import pytest
+
+import psycopg3
+from psycopg3 import encodings
+
+
+def test_names_normalised():
+ for name in encodings._py_codecs.values():
+ assert codecs.lookup(name).name == name
+
+
+@pytest.mark.parametrize(
+ "pyenc, pgenc",
+ [
+ ("ascii", "SQL_ASCII"),
+ ("utf8", "UTF8"),
+ ("utf-8", "UTF8"),
+ ("uTf-8", "UTF8"),
+ ("latin9", "LATIN9"),
+ ("iso8859-15", "LATIN9"),
+ ],
+)
+def test_py2pg(pyenc, pgenc):
+ assert encodings.py2pg(pyenc) == pgenc.encode("utf8")
+
+
+@pytest.mark.parametrize(
+ "pyenc, pgenc",
+ [
+ ("ascii", "SQL_ASCII"),
+ ("utf-8", "UTF8"),
+ ("iso8859-15", "LATIN9"),
+ ],
+)
+def test_pg2py(pyenc, pgenc):
+ assert encodings.pg2py(pgenc.encode("utf-8")) == pyenc
+
+
+@pytest.mark.parametrize("pgenc", ["MULE_INTERNAL", "EUC_TW"])
+def test_pg2py_missing(pgenc):
+ with pytest.raises(psycopg3.NotSupportedError):
+ encodings.pg2py(pgenc.encode("utf-8"))
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
-@pytest.mark.parametrize("encoding", ["utf8", "latin9", "sql_ascii"])
+@pytest.mark.parametrize("encoding", ["utf8", "latin9", "ascii"])
def test_dump_enc(conn, fmt_in, encoding):
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
def test_load_ascii(conn, typename, fmt_out):
cur = conn.cursor(format=fmt_out)
- conn.client_encoding = "sql_ascii"
+ conn.client_encoding = "ascii"
(res,) = cur.execute(
f"select chr(%s::int)::{typename}", (ord(eur),)
).fetchone()
def test_load_ascii_encanyway(conn, typename, fmt_out):
cur = conn.cursor(format=fmt_out)
- conn.client_encoding = "sql_ascii"
+ conn.client_encoding = "ascii"
(res,) = cur.execute(f"select 'aa'::{typename}").fetchone()
assert res == "aa"
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_text_array_ascii(conn, fmt_in, fmt_out):
- conn.client_encoding = "sql_ascii"
+ conn.client_encoding = "ascii"
cur = conn.cursor(format=fmt_out)
a = list(map(chr, range(1, 256))) + [eur]
exp = [s.encode("utf8") for s in a]