]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Added encoding mapping table between Postgres and Python
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 15 Mar 2020 04:09:27 +0000 (17:09 +1300)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 15 Mar 2020 04:09:27 +0000 (17:09 +1300)
psycopg3/pq_ctypes.py
psycopg3/pq_encodings.py [new file with mode: 0644]
tests/test_pq_exec.py
tests/test_pq_pgconn.py

index 24f3d300804728ab78c76406ffc7f50234349ba0..94eec08963ced4a2650fe2f774b75fc71431e77e 100644 (file)
@@ -19,6 +19,7 @@ from .pq_enums import (
     TransactionStatus,
     Ping,
 )
+from .pq_encodings import py_codecs
 from . import _pq_ctypes as impl
 
 
@@ -139,8 +140,10 @@ class PGconn:
         return TransactionStatus(rv)
 
     def parameter_status(self, name):
-        rv = impl.PQparameterStatus(self.pgconn_ptr, self._encode(name))
-        return self._decode(rv)
+        rv = impl.PQparameterStatus(
+            self.pgconn_ptr, self._encode(name, "utf8")
+        )
+        return self._decode(rv, "utf8")
 
     @property
     def protocol_version(self):
@@ -180,20 +183,35 @@ class PGconn:
             raise MemoryError("couldn't allocate PGresult")
         return PGresult(rv)
 
-    def _encode(self, s):
+    def _encode(self, s, py_enc=None):
         if isinstance(s, bytes):
             return s
         elif isinstance(s, str):
-            # TODO: encode in client encoding
-            return s.encode("utf8")
+            if py_enc is None:
+                pg_enc = self.parameter_status("client_encoding")
+                py_enc = py_codecs[pg_enc]
+                if py_enc is None:
+                    raise PQerror(
+                        f"PostgreSQL encoding {pg_enc} doesn't have a Python codec."
+                        f" Please use bytes instead of str"
+                    )
+            return s.encode(py_enc)
         else:
             raise TypeError(f"expected bytes or str, got {s!r} instead")
 
-    def _decode(self, b):
+    def _decode(self, b, py_enc=None):
         if b is None:
             return None
-        # TODO: decode in client encoding
-        return b.decode("utf8", "replace")
+
+        if py_enc is None:
+            pg_enc = self.parameter_status("client_encoding")
+            py_enc = py_codecs[pg_enc]
+
+        if py_enc is not None:
+            return b.decode(py_enc)
+        else:
+            # pretty much a punt, but this is only for communication, no data
+            return b.decode("utf8", "replace")
 
 
 class PGresult:
diff --git a/psycopg3/pq_encodings.py b/psycopg3/pq_encodings.py
new file mode 100644 (file)
index 0000000..824959f
--- /dev/null
@@ -0,0 +1,44 @@
+py_codecs = {
+    "BIG5": "big5",
+    "EUC_CN": "gb2312",
+    "EUC_JIS_2004": "euc_jis_2004",
+    "EUC_JP": "euc_jp",
+    "EUC_KR": "euc_kr",
+    "EUC_TW": None,  # not available in Python
+    "GB18030": "gb18030",
+    "GBK": "gbk",
+    "ISO_8859_5": "iso8859-5",
+    "ISO_8859_6": "iso8859-6",
+    "ISO_8859_7": "iso8859-7",
+    "ISO_8859_8": "iso8859-8",
+    "JOHAB": "johab",
+    "KOI8R": "koi8-r",
+    "KOI8U": "koi8-u",
+    "LATIN1": "iso8859-1",
+    "LATIN10": "iso8859-16",
+    "LATIN2": "iso8859-2",
+    "LATIN3": "iso8859-3",
+    "LATIN4": "iso8859-4",
+    "LATIN5": "iso8859-9",
+    "LATIN6": "iso8859-10",
+    "LATIN7": "iso8859-13",
+    "LATIN8": "iso8859-14",
+    "LATIN9": "iso8859-15",
+    "MULE_INTERNAL": None,  # not available in Python
+    "SHIFT_JIS_2004": "shift_jis_2004",
+    "SJIS": "shift_jis",
+    "SQL_ASCII": None,  # means no encoding, see PostgreSQL docs
+    "UHC": "cp949",
+    "UTF8": "utf-8",
+    "WIN1250": "cp1250",
+    "WIN1251": "cp1251",
+    "WIN1252": "cp1252",
+    "WIN1253": "cp1253",
+    "WIN1254": "cp1254",
+    "WIN1255": "cp1255",
+    "WIN1256": "cp1256",
+    "WIN1257": "cp1257",
+    "WIN1258": "cp1258",
+    "WIN866": "cp866",
+    "WIN874": "cp874",
+}
index cc62ba3b897014051110e29d00a35b7db30b3257..50cce1c97dd054f67fefb8144fdbd3071c6ebf91 100644 (file)
@@ -9,3 +9,8 @@ def test_exec_empty(pq, pgconn):
 def test_exec_command(pq, pgconn):
     res = pgconn.exec_("set timezone to utc")
     assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
+
+
+def test_exec_error(pq, pgconn):
+    res = pgconn.exec_("wat")
+    assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
index 53a1f5176d11222345ffbcfd6d67f1fd4c34299f..4a72c23339b8542cec775aad7b70c89e83d99a7f 100644 (file)
@@ -150,6 +150,20 @@ def test_parameter_status(pq, dsn, tempenv):
     assert pgconn.parameter_status("wat") is None
 
 
+def test_encoding(pq, pgconn):
+    res = pgconn.exec_("set client_encoding to latin1")
+    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
+    assert pgconn.parameter_status("client_encoding") == "LATIN1"
+
+    res = pgconn.exec_("set client_encoding to 'utf-8'")
+    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
+    assert pgconn.parameter_status("client_encoding") == "UTF8"
+
+    res = pgconn.exec_("set client_encoding to wat")
+    assert res.status == pq.ExecStatus.PGRES_FATAL_ERROR
+    assert pgconn.parameter_status("client_encoding") == "UTF8"
+
+
 def test_protocol_version(pgconn):
     assert pgconn.protocol_version == 3
 
@@ -158,6 +172,15 @@ def test_server_version(pgconn):
     assert pgconn.server_version >= 90400
 
 
+def test_error_message(pq, pgconn):
+    res = pgconn.exec_("set client_encoding to latin9")
+    assert res.status == pq.ExecStatus.PGRES_COMMAND_OK
+    res = pgconn.exec_(b"set client_encoding to '\xa4'")  # euro sign in latin9
+    msg = pgconn.error_message
+    assert isinstance(msg, str)  # decoded
+    assert "\u20ac" in msg  # decoded ok
+
+
 def test_backend_pid(pgconn):
     assert 2 <= pgconn.backend_pid <= 65535  # Unless increased in kernel?