return self._codec
@property
- def encoding(self) -> str:
+ def client_encoding(self) -> str:
rv = self.pgconn.parameter_status(b"client_encoding")
if rv is not None:
return rv.decode("ascii")
else:
return "UTF8"
- @encoding.setter
- def encoding(self, value: str) -> None:
+ @client_encoding.setter
+ def client_encoding(self, value: str) -> None:
self._set_client_encoding(value)
def _set_client_encoding(self, value: str) -> None:
def _set_client_encoding(self, value: str) -> None:
raise AttributeError(
- "'encoding' is read-only on async connections:"
+ "'client_encoding' is read-only on async connections:"
" please use await .set_client_encoding() instead."
)
self._encode: EncodeFunc
if self.connection is not None:
- if self.connection.encoding != "SQL_ASCII":
+ if self.connection.client_encoding != "SQL_ASCII":
self._encode = self.connection.codec.encode
else:
self._encode = codecs.lookup("utf8").encode
super().__init__(oid, context)
if self.connection is not None:
- if self.connection.encoding != "SQL_ASCII":
+ if self.connection.client_encoding != "SQL_ASCII":
self.decode = self.connection.codec.decode
else:
self.decode = None
self.pydecoder = None
conn = self.connection
if conn is not None:
- if conn.encoding == "UTF8":
+ if conn.client_encoding == "UTF8":
self.is_utf8 = 1
- elif conn.encoding != "SQL_ASCII":
+ elif conn.client_encoding != "SQL_ASCII":
self.pydecoder = conn.codec.decode
else:
self.pydecoder = codecs.lookup("utf8").decode
def test_get_encoding(conn):
(enc,) = conn.cursor().execute("show client_encoding").fetchone()
- assert enc == conn.encoding
+ assert enc == conn.client_encoding
def test_set_encoding(conn):
- newenc = "LATIN1" if conn.encoding != "LATIN1" else "UTF8"
- assert conn.encoding != newenc
- conn.encoding = newenc
- assert conn.encoding == newenc
+ newenc = "LATIN1" if conn.client_encoding != "LATIN1" else "UTF8"
+ assert conn.client_encoding != newenc
+ conn.client_encoding = newenc
+ assert conn.client_encoding == newenc
(enc,) = conn.cursor().execute("show client_encoding").fetchone()
assert enc == newenc
],
)
def test_normalize_encoding(conn, enc, out, codec):
- conn.encoding = enc
- assert conn.encoding == out
+ conn.client_encoding = enc
+ assert conn.client_encoding == out
assert conn.codec.name == codec
def test_encoding_env_var(dsn, monkeypatch, enc, out, codec):
monkeypatch.setenv("PGCLIENTENCODING", enc)
conn = psycopg3.connect(dsn)
- assert conn.encoding == out
+ assert conn.client_encoding == out
assert conn.codec.name == codec
def test_set_encoding_unsupported(conn):
- conn.encoding = "EUC_TW"
+ conn.client_encoding = "EUC_TW"
with pytest.raises(psycopg3.NotSupportedError):
conn.cursor().execute("select 1")
def test_set_encoding_bad(conn):
with pytest.raises(psycopg3.DatabaseError):
- conn.encoding = "WAT"
+ conn.client_encoding = "WAT"
@pytest.mark.parametrize(
cur = aconn.cursor()
await cur.execute("show client_encoding")
(enc,) = await cur.fetchone()
- assert enc == aconn.encoding
+ assert enc == aconn.client_encoding
async def test_set_encoding(aconn):
- newenc = "LATIN1" if aconn.encoding != "LATIN1" else "UTF8"
- assert aconn.encoding != newenc
+ newenc = "LATIN1" if aconn.client_encoding != "LATIN1" else "UTF8"
+ assert aconn.client_encoding != newenc
with pytest.raises(AttributeError):
- aconn.encoding = newenc
- assert aconn.encoding != newenc
+ aconn.client_encoding = newenc
+ assert aconn.client_encoding != newenc
await aconn.set_client_encoding(newenc)
- assert aconn.encoding == newenc
+ assert aconn.client_encoding == newenc
cur = aconn.cursor()
await cur.execute("show client_encoding")
(enc,) = await cur.fetchone()
)
async def test_normalize_encoding(aconn, enc, out, codec):
await aconn.set_client_encoding(enc)
- assert aconn.encoding == out
+ assert aconn.client_encoding == out
assert aconn.codec.name == codec
async def test_encoding_env_var(dsn, monkeypatch, enc, out, codec):
monkeypatch.setenv("PGCLIENTENCODING", enc)
aconn = await psycopg3.AsyncConnection.connect(dsn)
- assert aconn.encoding == out
+ assert aconn.client_encoding == out
assert aconn.codec.name == codec
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
- conn.encoding = "utf8"
+ conn.client_encoding = "utf8"
with cur.copy("copy copy_in from stdin (format text)") as copy:
for i in range(1, 256):
copy.write_row((i, None, chr(i)))
@pytest.mark.parametrize("encoding", ["utf8", "latin9"])
def test_query_encode(conn, encoding):
- conn.encoding = encoding
+ conn.client_encoding = encoding
cur = conn.cursor()
(res,) = cur.execute("select '\u20ac'").fetchone()
assert res == "\u20ac"
def test_query_badenc(conn):
- conn.encoding = "latin1"
+ conn.client_encoding = "latin1"
cur = conn.cursor()
with pytest.raises(UnicodeEncodeError):
cur.execute("select '\u20ac'")
msgs = []
conn.pgconn.exec_(b"set client_min_messages to notice")
conn.add_notice_handler(lambda diag: msgs.append(diag.message_primary))
- conn.encoding = enc
+ conn.client_encoding = enc
cur = conn.cursor()
cur.execute(
"do $$begin raise notice 'hello %', chr(8364); end$$ language plpgsql"
@pytest.mark.parametrize("enc", ["utf8", "latin9"])
def test_error_encoding(conn, enc):
- conn.encoding = enc
+ conn.client_encoding = enc
cur = conn.cursor()
with pytest.raises(e.DatabaseError) as excinfo:
cur.execute(
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
- conn.encoding = encoding
+ conn.client_encoding = encoding
(res,) = cur.execute(f"select {ph}::bytea", (eur,)).fetchone()
assert res == eur.encode("utf8")
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
- conn.encoding = "sql_ascii"
+ conn.client_encoding = "sql_ascii"
(res,) = cur.execute(f"select ascii({ph})", (eur,)).fetchone()
assert res == ord(eur)
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
- conn.encoding = "latin1"
+ conn.client_encoding = "latin1"
with pytest.raises(UnicodeEncodeError):
cur.execute(f"select {ph}::bytea", (eur,))
def test_load_enc(conn, typename, encoding, fmt_out):
cur = conn.cursor(format=fmt_out)
- conn.encoding = encoding
+ conn.client_encoding = encoding
(res,) = cur.execute(
f"select chr(%s::int)::{typename}", (ord(eur),)
).fetchone()
def test_load_badenc(conn, typename, fmt_out):
cur = conn.cursor(format=fmt_out)
- conn.encoding = "latin1"
+ conn.client_encoding = "latin1"
with pytest.raises(psycopg3.DatabaseError):
cur.execute(f"select chr(%s::int)::{typename}", (ord(eur),))
def test_load_ascii(conn, typename, fmt_out):
cur = conn.cursor(format=fmt_out)
- conn.encoding = "sql_ascii"
+ conn.client_encoding = "sql_ascii"
(res,) = cur.execute(
f"select chr(%s::int)::{typename}", (ord(eur),)
).fetchone()
def test_load_ascii_encanyway(conn, typename, fmt_out):
cur = conn.cursor(format=fmt_out)
- conn.encoding = "sql_ascii"
+ conn.client_encoding = "sql_ascii"
(res,) = cur.execute(f"select 'aa'::{typename}").fetchone()
assert res == "aa"
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
def test_text_array_ascii(conn, fmt_in, fmt_out):
- conn.encoding = "sql_ascii"
+ conn.client_encoding = "sql_ascii"
cur = conn.cursor(format=fmt_out)
a = list(map(chr, range(1, 256))) + [eur]
exp = [s.encode("utf8") for s in a]