import ctypes
import ctypes.util
from ctypes import Structure, POINTER
-from ctypes import c_char, c_char_p, c_int, c_uint, c_void_p
+from ctypes import c_char, c_char_p, c_int, c_size_t, c_ubyte, c_uint, c_void_p
from typing import List, Tuple
from psycopg3.errors import NotSupportedError
PQoidValue.restype = Oid
+# 33.3.4. Escaping Strings for Inclusion in SQL Commands
+
+# TODO: PQescapeLiteral PQescapeIdentifier PQescapeStringConn PQescapeString
+
+PQescapeByteaConn = pq.PQescapeByteaConn
+PQescapeByteaConn.argtypes = [
+ PGconn_ptr,
+ POINTER(c_char), # actually POINTER(c_ubyte) but this is easier
+ c_size_t,
+ POINTER(c_size_t),
+]
+PQescapeByteaConn.restype = POINTER(c_ubyte) # same, POINTER(c_ubyte)
+
+# TODO: PQescapeBytea PQunescapeBytea
+
+
# 33.4. Asynchronous Command Processing
PQsendQuery = pq.PQsendQuery
return "None"
elif t is c_void_p:
return "Any"
- elif t is c_int or t is c_uint:
+ elif t is c_int or t is c_uint or t is c_size_t:
return "int"
elif t is c_char_p:
return "bytes"
# Copyright (C) 2020 The Psycopg Team
from typing import Any, Optional, Sequence, NewType
-from ctypes import Array, c_char, c_char_p, c_int, c_uint, pointer
+from ctypes import Array, pointer
+from ctypes import c_char, c_char_p, c_int, c_ubyte, c_uint, c_ulong
Oid = c_uint
arg4: int,
arg5: Optional[Array[c_uint]],
) -> PGresult_struct: ...
+def PQescapeByteaConn(
+ arg1: Optional[PGconn_struct],
+ arg2: bytes,
+ arg3: int,
+ arg4: pointer[c_ulong],
+) -> pointer[c_ubyte]: ...
# fmt: off
# autogenerated: start
# Copyright (C) 2020 The Psycopg Team
-from ctypes import string_at
-from ctypes import Array, c_char_p, c_int, pointer
+from ctypes import Array, pointer, string_at
+from ctypes import c_char_p, c_int, c_size_t, c_ulong
from typing import Any, List, Optional, Sequence
+from typing import cast as t_cast
from .enums import (
ConnStatus,
raise MemoryError("couldn't allocate PGresult")
return PGresult(rv)
+ def escape_bytea(self, data: bytes) -> bytes:
+ len_out = c_size_t()
+ out = impl.PQescapeByteaConn(
+ self.pgconn_ptr,
+ data,
+ len(data),
+ pointer(t_cast(c_ulong, len_out)),
+ )
+ if not out:
+ raise MemoryError(
+ f"couldn't allocate {len(data)} bytes for escape_bytea"
+ )
+
+ rv = string_at(out, len_out.value - 1) # out includes final 0
+ impl.PQfreemem(out)
+ return rv
+
def get_result(self) -> Optional["PGresult"]:
rv = impl.PQgetResult(self.pgconn_ptr)
return PGresult(rv) if rv else None
# Copyright (C) 2020 The Psycopg Team
import codecs
-from typing import Optional, Union
+from typing import Optional, Tuple, Union
from ..adapt import (
Adapter,
return self._encode(obj)[0]
+@Adapter.text(bytes)
+class BytesAdapter(Adapter):
+ def adapt(self, obj: bytes) -> Tuple[bytes, Oid]:
+ return self.conn.pgconn.escape_bytea(obj), type_oid["bytea"]
+
+
+@Adapter.binary(bytes)
+def adapt_bytes(b: bytes) -> Tuple[bytes, Oid]:
+ return b, type_oid["bytea"]
+
+
@Typecaster.text(type_oid["text"])
@Typecaster.binary(type_oid["text"])
class StringCaster(Typecaster):
res = pgconn.make_empty_result(pq.ExecStatus.FATAL_ERROR)
assert res.status == pq.ExecStatus.FATAL_ERROR
assert b"wat" in res.error_message
+
+
+@pytest.mark.parametrize(
+ "data", [(b"hello\00world"), (b"\00\00\00\00")],
+)
+def test_escape_bytea(pgconn, data):
+ rv = pgconn.escape_bytea(data)
+ exp = br"\x" + b"".join(b"%02x" % c for c in data)
+ assert rv == exp
+
+
+def test_escape_1char(pgconn):
+ for c in range(256):
+ rv = pgconn.escape_bytea(bytes([c]))
+ exp = br"\x%02x" % c
+ assert rv == exp