]
PQescapeByteaConn.restype = POINTER(c_ubyte)
-# PQescapeBytea: deprecated
+PQescapeBytea = pq.PQescapeBytea
+PQescapeBytea.argtypes = [
+ POINTER(c_char), # actually POINTER(c_ubyte) but this is easier
+ c_size_t,
+ POINTER(c_size_t),
+]
+PQescapeBytea.restype = POINTER(c_ubyte)
+
PQunescapeBytea = pq.PQunescapeBytea
PQunescapeBytea.argtypes = [
def PQcmdTuples(arg1: Optional[PGresult_struct]) -> bytes: ...
def PQoidValue(arg1: Optional[PGresult_struct]) -> int: ...
def PQescapeByteaConn(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int, arg4: pointer[c_ulong]) -> pointer[c_ubyte]: ...
+def PQescapeBytea(arg1: bytes, arg2: int, arg3: pointer[c_ulong]) -> pointer[c_ubyte]: ...
def PQunescapeBytea(arg1: bytes, arg2: pointer[c_ulong]) -> pointer[c_ubyte]: ...
def PQsendQuery(arg1: Optional[PGconn_struct], arg2: bytes) -> int: ...
def PQsendQueryParams(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int, arg4: pointer[c_uint], arg5: pointer[c_char_p], arg6: pointer[c_int], arg7: pointer[c_int], arg8: int) -> int: ...
class Escaping:
- def __init__(self, conn: PGconn):
+ def __init__(self, conn: Optional[PGconn] = None):
self.conn = conn
def escape_bytea(self, data: bytes) -> bytes:
len_out = c_size_t()
- out = impl.PQescapeByteaConn(
- self.conn.pgconn_ptr,
- data,
- len(data),
- pointer(t_cast(c_ulong, len_out)),
- )
+ if self.conn is not None:
+ out = impl.PQescapeByteaConn(
+ self.conn.pgconn_ptr,
+ data,
+ len(data),
+ pointer(t_cast(c_ulong, len_out)),
+ )
+ else:
+ out = impl.PQescapeBytea(
+ data, len(data), pointer(t_cast(c_ulong, len_out)),
+ )
if not out:
raise MemoryError(
f"couldn't allocate for escape_bytea of {len(data)} bytes"
impl.PQfreemem(out)
return rv
- @classmethod
- def unescape_bytea(cls, data: bytes) -> bytes:
+ def unescape_bytea(self, data: bytes) -> bytes:
len_out = c_size_t()
out = impl.PQunescapeBytea(data, pointer(t_cast(c_ulong, len_out)))
if not out:
class BytesAdapter(Adapter):
def __init__(self, cls: type, conn: BaseConnection):
super().__init__(cls, conn)
- self.esc = Escaping(self.conn.pgconn)
+ self.esc = Escaping(
+ self.conn.pgconn if self.conn is not None else None
+ )
def adapt(self, obj: bytes) -> Tuple[bytes, int]:
return self.esc.escape_bytea(obj), BYTEA_OID
@TypeCaster.text(builtins["bytea"].oid)
@ArrayCaster.text(builtins["bytea"].array_oid)
def cast_bytea(data: bytes) -> bytes:
- return Escaping.unescape_bytea(data)
+ return Escaping().unescape_bytea(data)
@TypeCaster.binary(builtins["bytea"].oid)
assert rv == exp
+def test_escape_noconn(pq, pgconn):
+ data = bytes(range(256))
+ esc = pq.Escaping()
+ escdata = esc.escape_bytea(data)
+ res = pgconn.exec_params(
+ b"select '%s'::bytea" % escdata, [], result_format=1
+ )
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.get_value(0, 0) == data
+
+
def test_escape_1char(pq, pgconn):
esc = pq.Escaping(pgconn)
for c in range(256):
)
def test_unescape_bytea(pq, pgconn, data):
enc = br"\x" + b"".join(b"%02x" % c for c in data)
- rv = pq.Escaping.unescape_bytea(enc)
+ rv = pq.Escaping(pgconn).unescape_bytea(enc)
assert rv == data