+"""
+Mappings between PostgreSQL and Python encodings.
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
py_codecs = {
"BIG5": "big5",
"EUC_CN": "gb2312",
"EUC_JIS_2004": "euc_jis_2004",
"EUC_JP": "euc_jp",
"EUC_KR": "euc_kr",
- "EUC_TW": None, # not available in Python
+ # "EUC_TW": not available in Python
"GB18030": "gb18030",
"GBK": "gbk",
"ISO_8859_5": "iso8859-5",
"LATIN7": "iso8859-13",
"LATIN8": "iso8859-14",
"LATIN9": "iso8859-15",
- "MULE_INTERNAL": None, # not available in Python
+ # "MULE_INTERNAL": not available in Python
"SHIFT_JIS_2004": "shift_jis_2004",
"SJIS": "shift_jis",
"SQL_ASCII": None, # means no encoding, see PostgreSQL docs
TransactionStatus,
Ping,
)
-from .encodings import py_codecs
from . import _pq_ctypes as impl
@classmethod
def connect(cls, conninfo):
- if isinstance(conninfo, str):
- conninfo = conninfo.encode("utf8")
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {conninfo!r} instead")
@classmethod
def connect_start(cls, conninfo):
- if isinstance(conninfo, str):
- conninfo = conninfo.encode("utf8")
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {conninfo!r} instead")
@classmethod
def ping(self, conninfo):
- if isinstance(conninfo, str):
- conninfo = conninfo.encode("utf8")
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {conninfo!r} instead")
@property
def db(self):
- return self._decode(impl.PQdb(self.pgconn_ptr))
+ return impl.PQdb(self.pgconn_ptr)
@property
def user(self):
- return self._decode(impl.PQuser(self.pgconn_ptr))
+ return impl.PQuser(self.pgconn_ptr)
@property
def password(self):
- return self._decode(impl.PQpass(self.pgconn_ptr))
+ return impl.PQpass(self.pgconn_ptr)
@property
def host(self):
- return self._decode(impl.PQhost(self.pgconn_ptr))
+ return impl.PQhost(self.pgconn_ptr)
@property
def hostaddr(self):
- return self._decode(impl.PQhostaddr(self.pgconn_ptr))
+ return impl.PQhostaddr(self.pgconn_ptr)
@property
def port(self):
- return self._decode(impl.PQport(self.pgconn_ptr))
+ return impl.PQport(self.pgconn_ptr)
@property
def tty(self):
- return self._decode(impl.PQtty(self.pgconn_ptr))
+ return impl.PQtty(self.pgconn_ptr)
@property
def options(self):
- return self._decode(impl.PQoptions(self.pgconn_ptr))
+ return impl.PQoptions(self.pgconn_ptr)
@property
def status(self):
return TransactionStatus(rv)
def parameter_status(self, name):
- rv = impl.PQparameterStatus(
- self.pgconn_ptr, self._encode(name, "utf8")
- )
- return self._decode(rv, "utf8")
+ return impl.PQparameterStatus(self.pgconn_ptr, name)
@property
def protocol_version(self):
@property
def error_message(self):
- return self._decode(impl.PQerrorMessage(self.pgconn_ptr))
+ return impl.PQerrorMessage(self.pgconn_ptr)
@property
def socket(self):
return bool(impl.PQsslInUse(self.pgconn_ptr))
def exec_(self, command):
- rv = impl.PQexec(self.pgconn_ptr, self._encode(command))
+ if not isinstance(command, bytes):
+ raise TypeError(f"bytes expected, got {command!r} instead")
+ rv = impl.PQexec(self.pgconn_ptr, command)
if rv is None:
raise MemoryError("couldn't allocate PGresult")
return PGresult(rv)
- def _encode(self, s, py_enc=None):
- if isinstance(s, bytes):
- return s
- elif isinstance(s, str):
- if py_enc is None:
- pg_enc = self.parameter_status("client_encoding")
- py_enc = py_codecs[pg_enc]
- if py_enc is None:
- raise PQerror(
- f"PostgreSQL encoding {pg_enc} doesn't have a Python codec."
- f" Please use bytes instead of str"
- )
- return s.encode(py_enc)
- else:
- raise TypeError(f"expected bytes or str, got {s!r} instead")
-
- def _decode(self, b, py_enc=None):
- if b is None:
- return None
-
- if py_enc is None:
- pg_enc = self.parameter_status("client_encoding")
- py_enc = py_codecs[pg_enc]
-
- if py_enc is not None:
- return b.decode(py_enc)
- else:
- # pretty much a punt, but this is only for communication, no data
- return b.decode("utf8", "replace")
-
class PGresult:
__slots__ = ("pgresult_ptr",)
@classmethod
def parse(cls, conninfo):
- if isinstance(conninfo, str):
- conninfo = conninfo.encode("utf8")
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {conninfo!r} instead")
if not errmsg:
raise MemoryError("couldn't allocate on conninfo parse")
else:
- exc = PQerror(errmsg.value.decode("utf8", "replace"))
+ exc = PQerror(errmsg.value)
impl.PQfreemem(errmsg)
raise exc
@classmethod
def _options_from_array(cls, opts):
- def gets(opt, kw):
- rv = getattr(opt, kw)
- if rv is not None:
- rv = rv.decode("utf8", "replace")
- return rv
-
rv = []
skws = "keyword envvar compiled val label dispatcher".split()
for opt in opts:
if not opt.keyword:
break
- d = {kw: gets(opt, kw) for kw in skws}
+ d = {kw: getattr(opt, kw) for kw in skws}
d["dispsize"] = opt.dispsize
rv.append(ConninfoOption(**d))
dsn = request.config.getoption("--test-dsn")
if not dsn:
pytest.skip("skipping test as no --test-dsn")
- return dsn
+ return dsn.encode("utf8")
@pytest.fixture
tempenv["PGPORT"] = "15432"
defs = pq.Conninfo.get_defaults()
assert len(defs) > 20
- port = [d for d in defs if d.keyword == "port"][0]
- assert port.envvar == "PGPORT"
- assert port.compiled == "5432"
- assert port.val == "15432"
- assert port.label == "Database-Port"
- assert port.dispatcher == ""
+ port = [d for d in defs if d.keyword == b"port"][0]
+ assert port.envvar == b"PGPORT"
+ assert port.compiled == b"5432"
+ assert port.val == b"15432"
+ assert port.label == b"Database-Port"
+ assert port.dispatcher == b""
assert port.dispsize == 6
def test_conninfo_parse(pq):
info = pq.Conninfo.parse(
- "postgresql://host1:123,host2:456/somedb"
- "?target_session_attrs=any&application_name=myapp"
+ b"postgresql://host1:123,host2:456/somedb"
+ b"?target_session_attrs=any&application_name=myapp"
)
info = {i.keyword: i.val for i in info if i.val is not None}
- assert info["host"] == "host1,host2"
- assert info["port"] == "123,456"
- assert info["dbname"] == "somedb"
- assert info["application_name"] == "myapp"
+ assert info[b"host"] == b"host1,host2"
+ assert info[b"port"] == b"123,456"
+ assert info[b"dbname"] == b"somedb"
+ assert info[b"application_name"] == b"myapp"
def test_conninfo_parse_bad(pq):
with pytest.raises(pq.PQerror) as e:
- pq.Conninfo.parse("bad_conninfo=")
+ pq.Conninfo.parse(b"bad_conninfo=")
assert "bad_conninfo" in str(e.value)
#!/usr/bin/env python3
+import pytest
+
+
+def test_exec_none(pq, pgconn):
+ with pytest.raises(TypeError):
+ pgconn.exec_(None)
+
def test_exec_empty(pq, pgconn):
res = pgconn.exec_(b"")
def test_exec_command(pq, pgconn):
- res = pgconn.exec_("set timezone to utc")
+ res = pgconn.exec_(b"set timezone to utc")
assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
def test_exec_error(pq, pgconn):
- res = pgconn.exec_("wat")
+ res = pgconn.exec_(b"wat")
assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
assert conn.status == pq.ConnStatus.CONNECTION_OK, conn.error_message
-def test_connectdb_bytes(pq, dsn):
- conn = pq.PGconn.connect(dsn.encode("utf8"))
- assert conn.status == pq.ConnStatus.CONNECTION_OK, conn.error_message
-
-
def test_connectdb_error(pq):
- conn = pq.PGconn.connect("dbname=psycopg3_test_not_for_real")
+ conn = pq.PGconn.connect(b"dbname=psycopg3_test_not_for_real")
assert conn.status == pq.ConnStatus.CONNECTION_BAD
def test_connect_async_bad(pq, dsn):
- conn = pq.PGconn.connect_start("dbname=psycopg3_test_not_for_real")
+ conn = pq.PGconn.connect_start(b"dbname=psycopg3_test_not_for_real")
while 1:
assert conn.status != pq.ConnStatus.CONNECTION_BAD
rv = conn.connect_poll()
def test_info(pq, dsn, pgconn):
info = pgconn.info
assert len(info) > 20
- dbname = [d for d in info if d.keyword == "dbname"][0]
- assert dbname.envvar == "PGDATABASE"
- assert dbname.label == "Database-Name"
- assert dbname.dispatcher == ""
+ dbname = [d for d in info if d.keyword == b"dbname"][0]
+ assert dbname.envvar == b"PGDATABASE"
+ assert dbname.label == b"Database-Name"
+ assert dbname.dispatcher == b""
assert dbname.dispsize == 20
parsed = pq.Conninfo.parse(dsn)
- name = [o.val for o in parsed if o.keyword == "dbname"][0]
+ name = [o.val for o in parsed if o.keyword == b"dbname"][0]
assert dbname.val == name
rv = pq.PGconn.ping(dsn)
assert rv == pq.Ping.PQPING_OK
- rv = pq.PGconn.ping("port=99999")
+ rv = pq.PGconn.ping(b"port=99999")
assert rv == pq.Ping.PQPING_NO_RESPONSE
def test_db(pgconn):
- name = [o.val for o in pgconn.info if o.keyword == "dbname"][0]
+ name = [o.val for o in pgconn.info if o.keyword == b"dbname"][0]
assert pgconn.db == name
def test_user(pgconn):
- user = [o.val for o in pgconn.info if o.keyword == "user"][0]
+ user = [o.val for o in pgconn.info if o.keyword == b"user"][0]
assert pgconn.user == user
def test_password(pgconn):
# not in info
- assert isinstance(pgconn.password, str)
+ assert isinstance(pgconn.password, bytes)
def test_host(pgconn):
# might be not in info
- assert isinstance(pgconn.host, str)
+ assert isinstance(pgconn.host, bytes)
def test_hostaddr(pgconn):
# not in info
- assert isinstance(pgconn.hostaddr, str)
+ assert isinstance(pgconn.hostaddr, bytes)
def test_tty(pgconn):
- tty = [o.val for o in pgconn.info if o.keyword == "tty"][0]
+ tty = [o.val for o in pgconn.info if o.keyword == b"tty"][0]
assert pgconn.tty == tty
def test_parameter_status(pq, dsn, tempenv):
tempenv["PGAPPNAME"] = "psycopg3 tests"
pgconn = pq.PGconn.connect(dsn)
- assert pgconn.parameter_status("application_name") == "psycopg3 tests"
- assert pgconn.parameter_status("wat") is None
+ assert pgconn.parameter_status(b"application_name") == b"psycopg3 tests"
+ assert pgconn.parameter_status(b"wat") is None
def test_encoding(pq, pgconn):
- res = pgconn.exec_("set client_encoding to latin1")
+ res = pgconn.exec_(b"set client_encoding to latin1")
assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
- assert pgconn.parameter_status("client_encoding") == "LATIN1"
+ assert pgconn.parameter_status(b"client_encoding") == b"LATIN1"
- res = pgconn.exec_("set client_encoding to 'utf-8'")
+ res = pgconn.exec_(b"set client_encoding to 'utf-8'")
assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
- assert pgconn.parameter_status("client_encoding") == "UTF8"
+ assert pgconn.parameter_status(b"client_encoding") == b"UTF8"
- res = pgconn.exec_("set client_encoding to wat")
+ res = pgconn.exec_(b"set client_encoding to wat")
assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
- assert pgconn.parameter_status("client_encoding") == "UTF8"
+ assert pgconn.parameter_status(b"client_encoding") == b"UTF8"
def test_protocol_version(pgconn):
def test_error_message(pq, pgconn):
- res = pgconn.exec_("set client_encoding to latin9")
- assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
- res = pgconn.exec_(b"set client_encoding to '\xa4'") # euro sign in latin9
+ res = pgconn.exec_(b"wat")
+ assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
msg = pgconn.error_message
- assert isinstance(msg, str) # decoded
- assert "\u20ac" in msg # decoded ok
+ assert b"wat" in msg
def test_backend_pid(pgconn):
info = pq.Conninfo.parse(dsn)
has_password = (
"PGPASSWORD" in tempenv
- or [i for i in info if i.keyword == "password"][0].val is not None
+ or [i for i in info if i.keyword == b"password"][0].val is not None
)
if has_password:
assert pgconn.used_password
assert isinstance(pgconn.ssl_in_use, bool)
# If connecting via socket then ssl is not in use
- if pgconn.host.startswith("/"):
+ if pgconn.host.startswith(b"/"):
assert not pgconn.ssl_in_use
else:
- sslmode = [i.val for i in pgconn.info if i.keyword == "sslmode"][0]
- if sslmode not in ("disable", "allow"):
+ sslmode = [i.val for i in pgconn.info if i.keyword == b"sslmode"][0]
+ if sslmode not in (b"disable", b"allow"):
# 'prefer' may still connect without ssl
# but maybe unlikely in the tests environment?
assert pgconn.ssl_in_use