Current release
---------------
+Psycopg 3.0.4
+^^^^^^^^^^^^^
+
+- Allow to use the module with strict strings comparison (:ticket:`#147`).
+
+
Psycopg 3.0.3
^^^^^^^^^^^^^
}
py_codecs: Dict[Union[bytes, str], str] = {}
-py_codecs.update((k, v) for k, v in _py_codecs.items())
py_codecs.update((k.encode(), v) for k, v in _py_codecs.items())
pg_codecs = {v: k.encode() for k, v in _py_codecs.items()}
return pg_codecs[codecs.lookup(name).name]
-def pg2pyenc(name: Union[bytes, str]) -> str:
+def pg2pyenc(name: bytes) -> str:
"""Convert a Python encoding name to PostgreSQL encoding name.
Raise NotSupportedError if the PostgreSQL encoding is not supported by
try:
return py_codecs[name]
except KeyError:
- if isinstance(name, bytes):
- name = name.decode("utf8", "replace")
- raise NotSupportedError(f"codec not available in Python: {name!r}")
+ sname = name.decode("utf8", "replace")
+ raise NotSupportedError(f"codec not available in Python: {sname!r}")
The results of this function can be obtained accessing the object
attributes (`query`, `params`, `types`, `formats`).
"""
- if isinstance(query, Composable):
- query = query.as_bytes(self._tx)
+ if isinstance(query, str):
+ bquery = query.encode(self._encoding)
+ elif isinstance(query, Composable):
+ bquery = query.as_bytes(self._tx)
+ else:
+ bquery = query
if vars is not None:
(
self._want_formats,
self._order,
self._parts,
- ) = _query2pg(query, self._encoding)
+ ) = _query2pg(bquery, self._encoding)
else:
- if isinstance(query, str):
- query = query.encode(self._encoding)
- self.query = query
+ self.query = bquery
self._want_formats = self._order = None
self.dump(vars)
@lru_cache()
def _query2pg(
- query: Union[bytes, str], encoding: str
+ query: bytes, encoding: str
) -> Tuple[bytes, List[PyFormat], Optional[List[str]], List[QueryPart]]:
"""
Convert Python query and params into something Postgres understands.
(sequence of names used in the query, in the position they appear)
``parts`` (splits of queries and placeholders).
"""
- if isinstance(query, str):
- query = query.encode(encoding)
- if not isinstance(query, bytes):
- # encoding from str already happened
- raise TypeError(
- f"the query should be str or bytes,"
- f" got {type(query).__name__} instead"
- )
-
parts = _split_query(query, encoding)
order: Optional[List[str]] = None
chunks: List[bytes] = []
if item is None:
item = m.group(4)
if item is not None:
- upper = load(_re_undouble.sub(r"\1", item))
+ upper = load(_re_undouble.sub(rb"\1", item))
else:
upper = load(item)
or [i for i in info if i.keyword == b"password"][0].val is not None
)
if has_password:
- # The assumption that the password is needed is broken on the Travis
- # PG 10 setup so let's skip that
- print("\n".join(map(str, sorted(os.environ.items()))))
- if not (os.environ.get("TRAVIS") and os.environ.get("PGVER") == "10"):
- assert pgconn.used_password
+ assert pgconn.used_password
pgconn.finish()
pgconn.used_password
def test_encoding(self, conn):
enc = conn.execute("show client_encoding").fetchone()[0]
- assert conn.info.encoding == pg2pyenc(enc)
+ assert conn.info.encoding == pg2pyenc(enc.encode())
@pytest.mark.parametrize(
"enc, out, codec",
f"select pg_typeof(%{fmt_in})",
[info.python_type(10, Range(empty=True), [])],
)
- print(cur._query.params[0])
assert cur.fetchone()[0] == "tmptype"