From: Daniele Varrazzo Date: Thu, 2 Apr 2020 09:19:47 +0000 (+1300) Subject: Added bytes adaptation X-Git-Tag: 3.0.dev0~627 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=03f4cfd5fd78051dc87348873a035d12b27cb720;p=thirdparty%2Fpsycopg.git Added bytes adaptation --- diff --git a/psycopg3/pq/_pq_ctypes.py b/psycopg3/pq/_pq_ctypes.py index c2debb5d5..59d498bea 100644 --- a/psycopg3/pq/_pq_ctypes.py +++ b/psycopg3/pq/_pq_ctypes.py @@ -7,7 +7,7 @@ libpq access using ctypes import ctypes import ctypes.util from ctypes import Structure, POINTER -from ctypes import c_char, c_char_p, c_int, c_uint, c_void_p +from ctypes import c_char, c_char_p, c_int, c_size_t, c_ubyte, c_uint, c_void_p from typing import List, Tuple from psycopg3.errors import NotSupportedError @@ -358,6 +358,22 @@ PQoidValue.argtypes = [PGresult_ptr] PQoidValue.restype = Oid +# 33.3.4. Escaping Strings for Inclusion in SQL Commands + +# TODO: PQescapeLiteral PQescapeIdentifier PQescapeStringConn PQescapeString + +PQescapeByteaConn = pq.PQescapeByteaConn +PQescapeByteaConn.argtypes = [ + PGconn_ptr, + POINTER(c_char), # actually POINTER(c_ubyte) but this is easier + c_size_t, + POINTER(c_size_t), +] +PQescapeByteaConn.restype = POINTER(c_ubyte) # same, POINTER(c_ubyte) + +# TODO: PQescapeBytea PQunescapeBytea + + # 33.4. Asynchronous Command Processing PQsendQuery = pq.PQsendQuery @@ -425,7 +441,7 @@ def generate_stub() -> None: return "None" elif t is c_void_p: return "Any" - elif t is c_int or t is c_uint: + elif t is c_int or t is c_uint or t is c_size_t: return "int" elif t is c_char_p: return "bytes" diff --git a/psycopg3/pq/_pq_ctypes.pyi b/psycopg3/pq/_pq_ctypes.pyi index 99b8acca9..bc45cbb27 100644 --- a/psycopg3/pq/_pq_ctypes.pyi +++ b/psycopg3/pq/_pq_ctypes.pyi @@ -5,7 +5,8 @@ types stub for ctypes functions # Copyright (C) 2020 The Psycopg Team from typing import Any, Optional, Sequence, NewType -from ctypes import Array, c_char, c_char_p, c_int, c_uint, pointer +from ctypes import Array, pointer +from ctypes import c_char, c_char_p, c_int, c_ubyte, c_uint, c_ulong Oid = c_uint @@ -38,6 +39,12 @@ def PQprepare( arg4: int, arg5: Optional[Array[c_uint]], ) -> PGresult_struct: ... +def PQescapeByteaConn( + arg1: Optional[PGconn_struct], + arg2: bytes, + arg3: int, + arg4: pointer[c_ulong], +) -> pointer[c_ubyte]: ... # fmt: off # autogenerated: start diff --git a/psycopg3/pq/pq_ctypes.py b/psycopg3/pq/pq_ctypes.py index ee053fb00..e9c1d3311 100644 --- a/psycopg3/pq/pq_ctypes.py +++ b/psycopg3/pq/pq_ctypes.py @@ -8,9 +8,10 @@ implementation. # Copyright (C) 2020 The Psycopg Team -from ctypes import string_at -from ctypes import Array, c_char_p, c_int, pointer +from ctypes import Array, pointer, string_at +from ctypes import c_char_p, c_int, c_size_t, c_ulong from typing import Any, List, Optional, Sequence +from typing import cast as t_cast from .enums import ( ConnStatus, @@ -361,6 +362,23 @@ class PGconn: raise MemoryError("couldn't allocate PGresult") return PGresult(rv) + def escape_bytea(self, data: bytes) -> bytes: + len_out = c_size_t() + out = impl.PQescapeByteaConn( + self.pgconn_ptr, + data, + len(data), + pointer(t_cast(c_ulong, len_out)), + ) + if not out: + raise MemoryError( + f"couldn't allocate {len(data)} bytes for escape_bytea" + ) + + rv = string_at(out, len_out.value - 1) # out includes final 0 + impl.PQfreemem(out) + return rv + def get_result(self) -> Optional["PGresult"]: rv = impl.PQgetResult(self.pgconn_ptr) return PGresult(rv) if rv else None diff --git a/psycopg3/types/text.py b/psycopg3/types/text.py index 1fdbd7ce9..36d426817 100644 --- a/psycopg3/types/text.py +++ b/psycopg3/types/text.py @@ -5,7 +5,7 @@ Adapters of textual types. # Copyright (C) 2020 The Psycopg Team import codecs -from typing import Optional, Union +from typing import Optional, Tuple, Union from ..adapt import ( Adapter, @@ -32,6 +32,17 @@ class StringAdapter(Adapter): return self._encode(obj)[0] +@Adapter.text(bytes) +class BytesAdapter(Adapter): + def adapt(self, obj: bytes) -> Tuple[bytes, Oid]: + return self.conn.pgconn.escape_bytea(obj), type_oid["bytea"] + + +@Adapter.binary(bytes) +def adapt_bytes(b: bytes) -> Tuple[bytes, Oid]: + return b, type_oid["bytea"] + + @Typecaster.text(type_oid["text"]) @Typecaster.binary(type_oid["text"]) class StringCaster(Typecaster): diff --git a/tests/pq/test_pgconn.py b/tests/pq/test_pgconn.py index 131347935..4d056c2b6 100644 --- a/tests/pq/test_pgconn.py +++ b/tests/pq/test_pgconn.py @@ -226,3 +226,19 @@ def test_make_empty_result(pq, pgconn): res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR) assert res.status == pq.ExecStatus.FATAL_ERROR assert b"wat" in res.error_message + + +@pytest.mark.parametrize( + "data", [(b"hello\00world"), (b"\00\00\00\00")], +) +def test_escape_bytea(pgconn, data): + rv = pgconn.escape_bytea(data) + exp = br"\x" + b"".join(b"%02x" % c for c in data) + assert rv == exp + + +def test_escape_1char(pgconn): + for c in range(256): + rv = pgconn.escape_bytea(bytes([c])) + exp = br"\x%02x" % c + assert rv == exp