@property
def codec(self) -> codecs.CodecInfo:
# TODO: utf8 fastpath?
- pgenc = self.pgconn.parameter_status(b"client_encoding")
+ pgenc = self.pgconn.parameter_status(b"client_encoding") or b""
if self._pgenc != pgenc:
- try:
- pyenc = pq.py_codecs[pgenc.decode("ascii")]
- except KeyError:
- raise e.NotSupportedError(
- f"encoding {pgenc.decode('ascii')} not available in Python"
- )
- self._codec = codecs.lookup(pyenc)
+ if pgenc:
+ try:
+ pyenc = pq.py_codecs[pgenc.decode("ascii")]
+ except KeyError:
+ raise e.NotSupportedError(
+ f"encoding {pgenc.decode('ascii')} not available in Python"
+ )
+ self._codec = codecs.lookup(pyenc)
+ else:
+ # fallback for a connection closed whose codec was never asked
+ if not hasattr(self, "_codec"):
+ self._codec = codecs.lookup("utf8")
+
self._pgenc = pgenc
return self._codec
@property
def encoding(self) -> str:
- return self.pgconn.parameter_status(b"client_encoding").decode("ascii")
+ rv = self.pgconn.parameter_status(b"client_encoding")
+ if rv is not None:
+ return rv.decode("ascii")
+ else:
+ return "UTF8"
@classmethod
def _connect_gen(cls, conninfo: str) -> ConnectGen:
) -> RV:
return wait(gen, timeout=timeout)
- @property
- def encoding(self) -> str:
- return self.pgconn.parameter_status(b"client_encoding").decode("ascii")
-
- @encoding.setter
- def encoding(self, value: str) -> None:
+ def set_client_encoding(self, value: str) -> None:
with self.lock:
self.pgconn.send_query_params(
b"select set_config('client_encoding', $1, false)",
async def wait(cls, gen: Generator[Tuple[int, Wait], Ready, RV]) -> RV:
return await wait_async(gen)
- @property
- def encoding(self) -> str:
- return self.pgconn.parameter_status(b"client_encoding").decode("ascii")
-
- @encoding.setter
- def encoding(self, value: str) -> None:
- raise e.NotSupportedError(
- "you can't set 'encoding' on an async connection."
- " Use 'await conn.set_encoding()' instead"
- )
-
- async def set_encoding(self, value: str) -> None:
+ async def set_client_encoding(self, value: str) -> None:
async with self.lock:
self.pgconn.send_query_params(
b"select set_config('client_encoding', $1, false)",
rv = impl.PQtransactionStatus(self.pgconn_ptr)
return TransactionStatus(rv)
- def parameter_status(self, name: bytes) -> bytes:
+ def parameter_status(self, name: bytes) -> Optional[bytes]:
return impl.PQparameterStatus(self.pgconn_ptr, name)
@property
assert enc == aconn.encoding
-def test_set_encoding_noprop(aconn):
- newenc = "LATIN1" if aconn.encoding != "LATIN1" else "UTF8"
- assert aconn.encoding != newenc
- with pytest.raises(psycopg3.NotSupportedError):
- aconn.encoding = newenc
-
-
def test_set_encoding(aconn, loop):
newenc = "LATIN1" if aconn.encoding != "LATIN1" else "UTF8"
assert aconn.encoding != newenc
- loop.run_until_complete(aconn.set_encoding(newenc))
+ loop.run_until_complete(aconn.set_client_encoding(newenc))
assert aconn.encoding == newenc
cur = aconn.cursor()
loop.run_until_complete(cur.execute("show client_encoding"))
def test_set_encoding_bad(aconn, loop):
with pytest.raises(psycopg3.DatabaseError):
- loop.run_until_complete(aconn.set_encoding("WAT"))
+ loop.run_until_complete(aconn.set_client_encoding("WAT"))
def test_set_encoding(conn):
newenc = "LATIN1" if conn.encoding != "LATIN1" else "UTF8"
assert conn.encoding != newenc
- conn.encoding = newenc
+ conn.set_client_encoding(newenc)
assert conn.encoding == newenc
(enc,) = conn.cursor().execute("show client_encoding").fetchone()
assert enc == newenc
def test_set_encoding_unsupported(conn):
- conn.encoding = "EUC_TW"
+ conn.set_client_encoding("EUC_TW")
with pytest.raises(psycopg3.NotSupportedError):
conn.cursor().execute("select 1")
def test_set_encoding_bad(conn):
with pytest.raises(psycopg3.DatabaseError):
- conn.encoding = "WAT"
+ conn.set_client_encoding("WAT")
@pytest.mark.parametrize("encoding", ["utf8", "latin9"])
def test_query_encode(conn, encoding):
- conn.encoding = encoding
+ conn.set_client_encoding(encoding)
cur = conn.cursor()
(res,) = cur.execute("select '\u20ac'").fetchone()
assert res == "\u20ac"
def test_query_badenc(conn):
- conn.encoding = "latin1"
+ conn.set_client_encoding("latin1")
cur = conn.cursor()
with pytest.raises(UnicodeEncodeError):
cur.execute("select '\u20ac'")
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
- conn.encoding = encoding
+ conn.set_client_encoding(encoding)
(res,) = cur.execute(f"select {ph}::bytea", (eur,)).fetchone()
assert res == eur.encode("utf8")
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
- conn.encoding = "sql_ascii"
+ conn.set_client_encoding("sql_ascii")
(res,) = cur.execute(f"select ascii({ph})", (eur,)).fetchone()
assert res == ord(eur)
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
- conn.encoding = "latin1"
+ conn.set_client_encoding("latin1")
with pytest.raises(UnicodeEncodeError):
cur.execute(f"select {ph}::bytea", (eur,))
def test_load_enc(conn, typename, encoding, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
- conn.encoding = encoding
+ conn.set_client_encoding(encoding)
(res,) = cur.execute(
f"select chr(%s::int)::{typename}", (ord(eur),)
).fetchone()
def test_load_badenc(conn, typename, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
- conn.encoding = "latin1"
+ conn.set_client_encoding("latin1")
with pytest.raises(psycopg3.DatabaseError):
cur.execute(f"select chr(%s::int)::{typename}", (ord(eur),))
def test_load_ascii(conn, typename, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
- conn.encoding = "sql_ascii"
+ conn.set_client_encoding("sql_ascii")
(res,) = cur.execute(
f"select chr(%s::int)::{typename}", (ord(eur),)
).fetchone()
def test_load_ascii_encanyway(conn, typename, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
- conn.encoding = "sql_ascii"
+ conn.set_client_encoding("sql_ascii")
(res,) = cur.execute(f"select 'aa'::{typename}").fetchone()
assert res == "aa"
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_text_array_ascii(conn, fmt_in, fmt_out):
- conn.encoding = "sql_ascii"
+ conn.set_client_encoding("sql_ascii")
cur = conn.cursor(binary=fmt_out == Format.BINARY)
a = list(map(chr, range(1, 256))) + [eur]
exp = [s.encode("utf8") for s in a]