"utf8" is the default: not passing it is marginally faster.
*.egg-info/
.tox
/.eggs/
-/build
-/dist
+build/
+dist/
*.pstats
.mypy_cache
__pycache__/
if value >= self.prepare_threshold or prepare:
# The query has been executed enough times and needs to be prepared
- name = f"_pg3_{self._prepared_idx}".encode("utf-8")
+ name = f"_pg3_{self._prepared_idx}".encode()
self._prepared_idx += 1
return Prepare.SHOULD, name
else:
try:
return _timezones[tzname]
except KeyError:
- sname = tzname.decode("utf8") if tzname else "UTC"
+ sname = tzname.decode() if tzname else "UTC"
try:
zi: tzinfo = ZoneInfo(sname)
except KeyError:
else:
raise e.InterfaceError(
f"unexpected result {ExecStatus(result.status).name}"
- f" from command {command.decode('utf8')!r}"
+ f" from command {command.decode()!r}"
)
return result
if self.isolation_level is not None:
val = IsolationLevel(self.isolation_level)
parts.append(b"ISOLATION LEVEL")
- parts.append(val.name.replace("_", " ").encode("utf8"))
+ parts.append(val.name.replace("_", " ").encode())
if self.read_only is not None:
parts.append(b"READ ONLY" if self.read_only else b"READ WRITE")
"""
opts = _parse_conninfo(conninfo)
rv = {
- opt.keyword.decode("utf8"): opt.val.decode("utf8")
+ opt.keyword.decode(): opt.val.decode()
for opt in opts
if opt.val is not None
}
Return the result of pq.Conninfo.parse() on success.
"""
try:
- return pq.Conninfo.parse(conninfo.encode("utf8"))
+ return pq.Conninfo.parse(conninfo.encode())
except e.OperationalError as ex:
raise e.ProgrammingError(str(ex))
}
# Not returned by the libq. Bug? Bet we're using SSH.
defaults.setdefault(b"channel_binding", b"prefer")
- defaults[b"passfile"] = str(Path.home() / ".pgpass").encode("utf-8")
+ defaults[b"passfile"] = str(Path.home() / ".pgpass").encode()
return {
i.keyword.decode(pyenc): i.val.decode(pyenc)
`!None` if the cursor doesn't have a result available.
"""
msg = self.pgresult.command_status if self.pgresult else None
- if msg:
- return msg.decode("utf-8")
- else:
- return None
+ return msg.decode() if msg else None
def _make_row_maker(self) -> RowMaker[Row]:
raise NotImplementedError
py_codecs: Dict[Union[bytes, str], str] = {}
py_codecs.update((k, v) for k, v in _py_codecs.items())
-py_codecs.update((k.encode("utf-8"), v) for k, v in _py_codecs.items())
+py_codecs.update((k.encode(), v) for k, v in _py_codecs.items())
-pg_codecs = {v: k.encode("utf-8") for k, v in _py_codecs.items()}
+pg_codecs = {v: k.encode() for k, v in _py_codecs.items()}
def py2pg(name: str) -> bytes:
Generator to create a database connection without blocking.
"""
- conn = pq.PGconn.connect_start(conninfo.encode("utf8"))
+ conn = pq.PGconn.connect_start(conninfo.encode())
while 1:
if conn.status == ConnStatus.BAD:
raise e.OperationalError(
status = TransactionStatus(pgconn.transaction_status).name
if not pgconn.host.startswith(b"/"):
- parts.append(("host", pgconn.host.decode("utf-8")))
+ parts.append(("host", pgconn.host.decode()))
if pgconn.port != b"5432":
- parts.append(("port", pgconn.port.decode("utf-8")))
+ parts.append(("port", pgconn.port.decode()))
if pgconn.user != pgconn.db:
- parts.append(("user", pgconn.user.decode("utf-8")))
- parts.append(("database", pgconn.db.decode("utf-8")))
+ parts.append(("user", pgconn.user.decode()))
+ parts.append(("database", pgconn.db.decode()))
else:
status = ConnStatus(pgconn.status).name
if sd.oid != INVALID_OID:
info = self._get_base_type_info(sd.oid)
dumper.oid = info.array_oid or TEXT_ARRAY_OID
- dumper.delimiter = info.delimiter.encode("utf-8")
+ dumper.delimiter = info.delimiter.encode()
else:
dumper.oid = INVALID_OID
name = f"{info.name.title()}{base.__name__}"
attribs = {
"base_oid": info.oid,
- "delimiter": info.delimiter.encode("utf-8"),
+ "delimiter": info.delimiter.encode(),
}
loader = type(name, (base,), attribs)
adapters.register_loader(info.array_oid, loader)
attribs = {
"oid": info.array_oid,
"element_oid": info.oid,
- "delimiter": info.delimiter.encode("utf-8"),
+ "delimiter": info.delimiter.encode(),
}
dumper = type(name, (base,), attribs)
adapters.register_dumper(None, dumper)
def dump(self, obj: date) -> bytes:
# NOTE: whatever the PostgreSQL DateStyle input format (DMY, MDY, YMD)
# the YYYY-MM-DD is always understood correctly.
- return str(obj).encode("utf8")
+ return str(obj).encode()
class DateBinaryDumper(Dumper):
class _BaseTimeTextDumper(_BaseTimeDumper):
def dump(self, obj: time) -> bytes:
- return str(obj).encode("utf8")
+ return str(obj).encode()
class TimeDumper(_BaseTimeTextDumper):
def dump(self, obj: datetime) -> bytes:
# NOTE: whatever the PostgreSQL DateStyle input format (DMY, MDY, YMD)
# the YYYY-MM-DD is always understood correctly.
- return str(obj).encode("utf8")
+ return str(obj).encode()
class DatetimeDumper(_BaseDatetimeTextDumper):
setattr(self, "dump", self._dump_sql)
def dump(self, obj: timedelta) -> bytes:
- return str(obj).encode("utf8")
+ return str(obj).encode()
def _dump_sql(self, obj: timedelta) -> bytes:
# sql_standard format needs explicit signs
def dump(self, obj: _JsonWrapper) -> bytes:
dumps = obj.dumps or self.dumps
- return dumps(obj.obj).encode("utf-8")
+ return dumps(obj.obj).encode()
class JsonDumper(_JsonDumper):
def dump(self, obj: _JsonWrapper) -> bytes:
dumps = obj.dumps or self.dumps
- return b"\x01" + dumps(obj.obj).encode("utf-8")
+ return b"\x01" + dumps(obj.obj).encode()
class _JsonLoader(Loader):
oid = postgres.types["inet"].oid
def dump(self, obj: Interface) -> bytes:
- return str(obj).encode("utf8")
+ return str(obj).encode()
class NetworkDumper(Dumper):
oid = postgres.types["cidr"].oid
def dump(self, obj: Network) -> bytes:
- return str(obj).encode("utf8")
+ return str(obj).encode()
class AddressBinaryDumper(Dumper):
data = bytes(data)
if b"/" in data:
- return ip_interface(data.decode("utf8"))
+ return ip_interface(data.decode())
else:
- return ip_address(data.decode("utf8"))
+ return ip_address(data.decode())
class InetBinaryLoader(_LazyIpaddress):
if isinstance(data, memoryview):
data = bytes(data)
- return ip_network(data.decode("utf8"))
+ return ip_network(data.decode())
class CidrBinaryLoader(_LazyIpaddress):
else:
return IPv6Network((packed, prefix))
- return ip_network(data.decode("utf8"))
+ return ip_network(data.decode())
def register_default_adapters(context: AdaptContext) -> None:
class _NumberDumper(Dumper):
def dump(self, obj: Any) -> bytes:
- return str(obj).encode("utf8")
+ return str(obj).encode()
def quote(self, obj: Any) -> bytes:
value = self.dump(obj)
# cover NaN and sNaN
return b"NaN"
else:
- return str(obj).encode("utf8")
+ return str(obj).encode()
_special = {
b"Infinity": b"'Infinity'::numeric",
def load(self, data: Buffer) -> Decimal:
if isinstance(data, memoryview):
data = bytes(data)
- return Decimal(data.decode("utf8"))
+ return Decimal(data.decode())
DEC_DIGITS = 4 # decimal digits per Postgres "digit"
oid = postgres.types["uuid"].oid
def dump(self, obj: "uuid.UUID") -> bytes:
- return obj.hex.encode("utf8")
+ return obj.hex.encode()
class UUIDBinaryDumper(UUIDDumper):
def load(self, data: Buffer) -> "uuid.UUID":
if isinstance(data, memoryview):
data = bytes(data)
- return UUID(data.decode("utf8"))
+ return UUID(data.decode())
class UUIDBinaryLoader(UUIDLoader):
Generator to create a database connection without blocking.
"""
- cdef pq.PGconn conn = pq.PGconn.connect_start(conninfo.encode("utf8"))
+ cdef pq.PGconn conn = pq.PGconn.connect_start(conninfo.encode())
logger.debug("connection started, status %s", conn.status)
cdef libpq.PGconn *pgconn_ptr = conn._pgconn_ptr
cdef int conn_status = libpq.PQstatus(pgconn_ptr)
if ptr != NULL:
return <object>ptr
- sname = tzname.decode("utf8") if tzname else "UTC"
+ sname = tzname.decode() if tzname else "UTC"
try:
zi = ZoneInfo(sname)
except KeyError:
self._bytes_encoding = b"utf-8"
self.is_utf8 = 1
else:
- self._bytes_encoding = pg2py(pgenc).encode("utf-8")
+ self._bytes_encoding = pg2py(pgenc).encode()
if self._bytes_encoding == b"ascii":
self.is_utf8 = 1
self.encoding = PyBytes_AsString(self._bytes_encoding)
self._bytes_encoding = b"utf-8"
self.is_utf8 = 1
else:
- self._bytes_encoding = pg2py(pgenc).encode("utf-8")
+ self._bytes_encoding = pg2py(pgenc).encode()
if pgenc == b"SQL_ASCII":
self.encoding = NULL
log.error(f"couldn't run {pg_config!r} --{what}: %s", e)
raise
else:
- return out.stdout.strip().decode("utf8")
+ return out.stdout.strip().decode()
class psycopg_build_ext(build_ext):
self._cls = cls
def dump(self, obj: str) -> bytes:
- return (obj * 2).encode("utf-8")
+ return (obj * 2).encode()
def quote(self, obj: str) -> bytes:
value = self.dump(obj)
pass
def load(self, data: Buffer) -> str:
- return (bytes(data) * 2).decode("utf-8")
+ return (bytes(data) * 2).decode()
# This should be the definition of psycopg.adapt.DumperKey, but mypy doesn't
"""Return a PGconn connection open to `--test-dsn`."""
from psycopg import pq
- conn = pq.PGconn.connect(dsn.encode("utf8"))
+ conn = pq.PGconn.connect(dsn.encode())
if conn.status != pq.ConnStatus.OK:
pytest.fail(
f"bad connection: {conn.error_message.decode('utf8', 'replace')}"
def test_escape_string_badenc(pgconn):
res = pgconn.exec_(b"set client_encoding to 'UTF8'")
assert res.status == pq.ExecStatus.COMMAND_OK
- data = "\u20ac".encode("utf8")[:-1]
+ data = "\u20ac".encode()[:-1]
esc = pq.Escaping(pgconn)
with pytest.raises(psycopg.OperationalError):
esc.escape_string(data)
def test_connectdb(dsn):
- conn = pq.PGconn.connect(dsn.encode("utf8"))
+ conn = pq.PGconn.connect(dsn.encode())
assert conn.status == pq.ConnStatus.OK, conn.error_message
def test_connect_async(dsn):
- conn = pq.PGconn.connect_start(dsn.encode("utf8"))
+ conn = pq.PGconn.connect_start(dsn.encode())
conn.nonblocking = 1
while 1:
assert conn.status != pq.ConnStatus.BAD
def test_connect_async_bad(dsn):
parsed_dsn = {
- e.keyword: e.val
- for e in pq.Conninfo.parse(dsn.encode("utf8"))
- if e.val
+ e.keyword: e.val for e in pq.Conninfo.parse(dsn.encode()) if e.val
}
parsed_dsn[b"dbname"] = b"psycopg_test_not_for_real"
dsn = b" ".join(b"%s='%s'" % item for item in parsed_dsn.items())
def test_weakref(dsn):
- conn = pq.PGconn.connect(dsn.encode("utf8"))
+ conn = pq.PGconn.connect(dsn.encode())
w = weakref.ref(conn)
conn.finish()
del conn
assert dbname.dispchar == b""
assert dbname.dispsize == 20
- parsed = pq.Conninfo.parse(dsn.encode("utf8"))
+ parsed = pq.Conninfo.parse(dsn.encode())
name = [o.val for o in parsed if o.keyword == b"dbname"][0]
user = [o.val for o in parsed if o.keyword == b"user"][0]
assert dbname.val == (name or user)
def test_ping(dsn):
- rv = pq.PGconn.ping(dsn.encode("utf8"))
+ rv = pq.PGconn.ping(dsn.encode())
assert rv == pq.Ping.OK
rv = pq.PGconn.ping(b"port=9999")
def test_parameter_status(dsn, monkeypatch):
monkeypatch.setenv("PGAPPNAME", "psycopg tests")
- pgconn = pq.PGconn.connect(dsn.encode("utf8"))
+ pgconn = pq.PGconn.connect(dsn.encode())
assert pgconn.parameter_status(b"application_name") == b"psycopg tests"
assert pgconn.parameter_status(b"wat") is None
pgconn.finish()
def test_socket(pgconn):
socket = pgconn.socket
assert socket > 0
- pgconn.exec_(
- f"select pg_terminate_backend({pgconn.backend_pid})".encode("utf8")
- )
+ pgconn.exec_(f"select pg_terminate_backend({pgconn.backend_pid})".encode())
# TODO: on my box it raises OperationalError as it should. Not on Travis,
# so let's see if at least an ok value comes out of it.
try:
# Note that the server may still need a password passed via pgpass
# so it may be that has_password is false but still a password was
# requested by the server and passed by libpq.
- info = pq.Conninfo.parse(dsn.encode("utf8"))
+ info = pq.Conninfo.parse(dsn.encode())
has_password = (
"PGPASSWORD" in os.environ
or [i for i in info if i.keyword == b"password"][0].val is not None
pgconn.finish()
assert "[BAD]" in str(pgconn)
- pgconn2 = pq.PGconn.connect_start(dsn.encode("utf8"))
+ pgconn2 = pq.PGconn.connect_start(dsn.encode())
assert "[" in str(pgconn2)
assert "[IDLE]" not in str(pgconn2)
class MyStrDumper(StrDumper):
def dump(self, obj):
- return (obj * 2).encode("utf-8")
+ return (obj * 2).encode()
conn.adapters.register_dumper(str, MyStrDumper)
assert conn.execute("select %t", ["hello"]).fetchone()[0] == "hellohello"
class MyTextLoader(TextLoader):
def load(self, data):
- return (bytes(data) * 2).decode("utf-8")
+ return (bytes(data) * 2).decode()
conn.adapters.register_loader("text", MyTextLoader)
assert conn.execute("select 'hello'::text").fetchone()[0] == "hellohello"
)
def test_normalize_encoding(conn, enc, out, codec):
conn.client_encoding = enc
- assert (
- conn.pgconn.parameter_status(b"client_encoding").decode("utf-8") == out
- )
+ assert conn.pgconn.parameter_status(b"client_encoding").decode() == out
assert conn.client_encoding == codec
def test_encoding_env_var(dsn, monkeypatch, enc, out, codec):
monkeypatch.setenv("PGCLIENTENCODING", enc)
conn = psycopg.connect(dsn)
- assert (
- conn.pgconn.parameter_status(b"client_encoding").decode("utf-8") == out
- )
+ assert conn.pgconn.parameter_status(b"client_encoding").decode() == out
assert conn.client_encoding == codec
)
async def test_normalize_encoding(aconn, enc, out, codec):
await aconn.set_client_encoding(enc)
- assert (
- aconn.pgconn.parameter_status(b"client_encoding").decode("utf-8")
- == out
- )
+ assert aconn.pgconn.parameter_status(b"client_encoding").decode() == out
assert aconn.client_encoding == codec
async def test_encoding_env_var(dsn, monkeypatch, enc, out, codec):
monkeypatch.setenv("PGCLIENTENCODING", enc)
aconn = await psycopg.AsyncConnection.connect(dsn)
- assert (
- aconn.pgconn.parameter_status(b"client_encoding").decode("utf-8")
- == out
- )
+ assert aconn.pgconn.parameter_status(b"client_encoding").decode() == out
assert aconn.client_encoding == codec
pytest.skip("hostaddr not supported on libpq < 12")
info_val = getattr(conn.info, info_attr)
- pgconn_val = getattr(conn.pgconn, pgconn_attr).decode("utf-8")
+ pgconn_val = getattr(conn.pgconn, pgconn_attr).decode()
assert info_val == pgconn_val
conn.close()
conn.info.hostaddr
def test_port(self, conn):
- assert conn.info.port == int(conn.pgconn.port.decode("utf-8"))
+ assert conn.info.port == int(conn.pgconn.port.decode())
conn.close()
with pytest.raises(psycopg.OperationalError):
conn.info.port
def test_no_password(self, dsn):
dsn2 = make_conninfo(dsn, password="the-pass-word")
- pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode("utf8"))
+ pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode())
info = ConnectionInfo(pgconn)
assert info.password == "the-pass-word"
assert "password" not in info.get_parameters()
def test_dsn_no_password(self, dsn):
dsn2 = make_conninfo(dsn, password="the-pass-word")
- pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode("utf8"))
+ pgconn = psycopg.pq.PGconn.connect_start(dsn2.encode())
info = ConnectionInfo(pgconn)
assert info.password == "the-pass-word"
assert "password" not in info.dsn
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
with cur.copy("copy copy_in from stdin (format text)") as copy:
- copy.write(sample_text.decode("utf8"))
+ copy.write(sample_text.decode())
data = cur.execute("select * from copy_in order by 1").fetchall()
assert data == sample_records
ensure_table(cur, sample_tabledef)
with pytest.raises(e.QueryCanceled):
with cur.copy("copy copy_in from stdin (format binary)") as copy:
- copy.write(sample_text.decode("utf8"))
+ copy.write(sample_text.decode())
assert conn.pgconn.transaction_status == conn.TransactionStatus.INERROR
cur = conn.cursor()
with cur.copy("copy copy_in from stdin") as copy:
for block in gen.blocks():
- copy.write(block.encode("utf8"))
+ copy.write(block.encode())
gen.assert_data()
if isinstance(item, int):
return bytes([0, 0, 0, item])
elif isinstance(item, str):
- return item.encode("utf8")
+ return item.encode()
return item
if not block:
break
if isinstance(block, str):
- block = block.encode("utf8")
+ block = block.encode()
m.update(block)
return m.hexdigest()
cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
async with cur.copy("copy copy_in from stdin (format text)") as copy:
- await copy.write(sample_text.decode("utf8"))
+ await copy.write(sample_text.decode())
await cur.execute("select * from copy_in order by 1")
data = await cur.fetchall()
await ensure_table(cur, sample_tabledef)
with pytest.raises(e.QueryCanceled):
async with cur.copy("copy copy_in from stdin (format binary)") as copy:
- await copy.write(sample_text.decode("utf8"))
+ await copy.write(sample_text.decode())
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
cur = aconn.cursor()
async with cur.copy("copy copy_in from stdin") as copy:
for block in gen.blocks():
- await copy.write(block.encode("utf8"))
+ await copy.write(block.encode())
await gen.assert_data()
if not block:
break
if isinstance(block, str):
- block = block.encode("utf8")
+ block = block.encode()
m.update(block)
return m.hexdigest()
for rec in cur.stream("select generate_series(1, 2)", binary=False):
recs.append(rec)
assert cur.pgresult.fformat(0) == 0
- assert cur.pgresult.get_value(0, 0) == str(rec[0]).encode("utf8")
+ assert cur.pgresult.get_value(0, 0) == str(rec[0]).encode()
assert recs == [(1,), (2,)]
async for rec in cur.stream("select generate_series(1, 2)", binary=False):
recs.append(rec)
assert cur.pgresult.fformat(0) == 0
- assert cur.pgresult.get_value(0, 0) == str(rec[0]).encode("utf8")
+ assert cur.pgresult.get_value(0, 0) == str(rec[0]).encode()
assert recs == [(1,), (2,)]
],
)
def test_py2pg(pyenc, pgenc):
- assert encodings.py2pg(pyenc) == pgenc.encode("utf8")
+ assert encodings.py2pg(pyenc) == pgenc.encode()
@pytest.mark.parametrize(
],
)
def test_pg2py(pyenc, pgenc):
- assert encodings.pg2py(pgenc.encode("utf-8")) == pyenc
+ assert encodings.pg2py(pgenc.encode()) == pyenc
@pytest.mark.parametrize("pgenc", ["MULE_INTERNAL", "EUC_TW"])
def test_pg2py_missing(pgenc):
with pytest.raises(psycopg.NotSupportedError):
- encodings.pg2py(pgenc.encode("utf-8"))
+ encodings.pg2py(pgenc.encode())
assert len(conn._prepared._prepared) == 5
assert conn._prepared._prepared[b"select 'a'", ()] == b"_pg3_0"
for i in [9, 8, 7, 6]:
- assert conn._prepared._prepared[f"select {i}".encode("utf8"), ()] == 1
+ assert conn._prepared._prepared[f"select {i}".encode(), ()] == 1
cur = conn.execute("select statement from pg_prepared_statements")
assert cur.fetchall() == [("select 'a'",)]
assert len(conn._prepared._prepared) == 5
for i in [9, 8, 7, 6, "'a'"]:
- assert conn._prepared._prepared[
- f"select {i}".encode("utf8"), ()
- ].startswith(b"_pg3_")
+ name = conn._prepared._prepared[f"select {i}".encode(), ()]
+ assert name.startswith(b"_pg3_")
cur = conn.execute(
"select statement from pg_prepared_statements order by prepare_time",
assert len(aconn._prepared._prepared) == 5
assert aconn._prepared._prepared[b"select 'a'", ()] == b"_pg3_0"
for i in [9, 8, 7, 6]:
- assert aconn._prepared._prepared[f"select {i}".encode("utf8"), ()] == 1
+ assert aconn._prepared._prepared[f"select {i}".encode(), ()] == 1
cur = await aconn.execute("select statement from pg_prepared_statements")
assert await cur.fetchall() == [("select 'a'",)]
assert len(aconn._prepared._prepared) == 5
for i in [9, 8, 7, 6, "'a'"]:
assert aconn._prepared._prepared[
- f"select {i}".encode("utf8"), ()
+ f"select {i}".encode(), ()
].startswith(b"_pg3_")
cur = await aconn.execute(
)
conn.client_encoding = "utf8"
- assert sql.Literal(eur).as_bytes(conn) == f"'{eur}'".encode("utf8")
+ assert sql.Literal(eur).as_bytes(conn) == f"'{eur}'".encode()
conn.client_encoding = "latin9"
assert sql.Literal(eur).as_bytes(conn) == f"'{eur}'".encode("latin9")
assert sql.SQL("foo").as_bytes(conn) == b"foo"
conn.client_encoding = "utf8"
- assert sql.SQL(eur).as_bytes(conn) == eur.encode("utf8")
+ assert sql.SQL(eur).as_bytes(conn) == eur.encode()
conn.client_encoding = "latin9"
assert sql.SQL(eur).as_bytes(conn) == eur.encode("latin9")
obj = sql.Composed([sql.SQL("foo"), sql.SQL(eur)])
conn.client_encoding = "utf8"
- assert obj.as_bytes(conn) == ("foo" + eur).encode("utf8")
+ assert obj.as_bytes(conn) == ("foo" + eur).encode()
conn.client_encoding = "latin9"
assert obj.as_bytes(conn) == ("foo" + eur).encode("latin9")
oid = psycopg.postgres.types["box"].oid
def dump(self, box):
- return ("(%s,%s),(%s,%s)" % box.coords).encode("utf8")
+ return ("(%s,%s),(%s,%s)" % box.coords).encode()
conn.adapters.register_dumper(Box, BoxDumper)
)
def test_parse_ok(s, d):
loader = HstoreLoader(dict, None)
- assert loader.load(s.encode("utf8")) == d
+ assert loader.load(s.encode()) == d
@pytest.mark.parametrize(
def test_parse_bad(s):
with pytest.raises(psycopg.DataError):
loader = HstoreLoader(dict, None)
- loader.load(s.encode("utf8"))
+ loader.load(s.encode())
def test_register_conn(hstore, conn):
conn.client_encoding = "ascii"
cur.execute(f"select chr(%s)::{typename}", (ord(eur),))
- assert cur.fetchone()[0] == eur.encode("utf8")
+ assert cur.fetchone()[0] == eur.encode()
stmt = sql.SQL("copy (select chr({})) to stdout (format {})").format(
ord(eur), sql.SQL(fmt_out.name)
copy.set_types([typename])
(res,) = copy.read_row()
- assert res == eur.encode("utf8")
+ assert res == eur.encode()
@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
conn.client_encoding = "ascii"
cur = conn.cursor(binary=fmt_out)
a = list(map(chr, range(1, 256))) + [eur]
- exp = [s.encode("utf8") for s in a]
+ exp = [s.encode() for s in a]
(res,) = cur.execute(f"select %{fmt_in}::text[]", (a,)).fetchone()
assert res == exp