]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Added wrapper for escape_bytea with no connection
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 3 Apr 2020 11:57:21 +0000 (00:57 +1300)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 3 Apr 2020 11:57:21 +0000 (00:57 +1300)
Changed unescape_bytea to be a normal method. The pgconn is now
optional.

psycopg3/pq/_pq_ctypes.py
psycopg3/pq/_pq_ctypes.pyi
psycopg3/pq/pq_ctypes.py
psycopg3/types/text.py
tests/pq/test_escaping.py

index 135075aac3efa31df064a42718446dc47b5efcf5..3dc48645288fbc4cf4fe555d8130bb29133cfca4 100644 (file)
@@ -371,7 +371,14 @@ PQescapeByteaConn.argtypes = [
 ]
 PQescapeByteaConn.restype = POINTER(c_ubyte)
 
-# PQescapeBytea: deprecated
+PQescapeBytea = pq.PQescapeBytea
+PQescapeBytea.argtypes = [
+    POINTER(c_char),  # actually POINTER(c_ubyte) but this is easier
+    c_size_t,
+    POINTER(c_size_t),
+]
+PQescapeBytea.restype = POINTER(c_ubyte)
+
 
 PQunescapeBytea = pq.PQunescapeBytea
 PQunescapeBytea.argtypes = [
index 9408d7f7edd367fe3ba56b4488614c853002bd8f..340e632058310772cfc5242b51088daa7ab131e3 100644 (file)
@@ -103,6 +103,7 @@ def PQcmdStatus(arg1: Optional[PGresult_struct]) -> bytes: ...
 def PQcmdTuples(arg1: Optional[PGresult_struct]) -> bytes: ...
 def PQoidValue(arg1: Optional[PGresult_struct]) -> int: ...
 def PQescapeByteaConn(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int, arg4: pointer[c_ulong]) -> pointer[c_ubyte]: ...
+def PQescapeBytea(arg1: bytes, arg2: int, arg3: pointer[c_ulong]) -> pointer[c_ubyte]: ...
 def PQunescapeBytea(arg1: bytes, arg2: pointer[c_ulong]) -> pointer[c_ubyte]: ...
 def PQsendQuery(arg1: Optional[PGconn_struct], arg2: bytes) -> int: ...
 def PQsendQueryParams(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int, arg4: pointer[c_uint], arg5: pointer[c_char_p], arg6: pointer[c_int], arg7: pointer[c_int], arg8: int) -> int: ...
index 626275ed4ff66502839b4e767457b1375292b3ff..e84223dcb90712e34a4952b3373c8a96398ef23f 100644 (file)
@@ -537,17 +537,22 @@ class Conninfo:
 
 
 class Escaping:
-    def __init__(self, conn: PGconn):
+    def __init__(self, conn: Optional[PGconn] = None):
         self.conn = conn
 
     def escape_bytea(self, data: bytes) -> bytes:
         len_out = c_size_t()
-        out = impl.PQescapeByteaConn(
-            self.conn.pgconn_ptr,
-            data,
-            len(data),
-            pointer(t_cast(c_ulong, len_out)),
-        )
+        if self.conn is not None:
+            out = impl.PQescapeByteaConn(
+                self.conn.pgconn_ptr,
+                data,
+                len(data),
+                pointer(t_cast(c_ulong, len_out)),
+            )
+        else:
+            out = impl.PQescapeBytea(
+                data, len(data), pointer(t_cast(c_ulong, len_out)),
+            )
         if not out:
             raise MemoryError(
                 f"couldn't allocate for escape_bytea of {len(data)} bytes"
@@ -557,8 +562,7 @@ class Escaping:
         impl.PQfreemem(out)
         return rv
 
-    @classmethod
-    def unescape_bytea(cls, data: bytes) -> bytes:
+    def unescape_bytea(self, data: bytes) -> bytes:
         len_out = c_size_t()
         out = impl.PQunescapeBytea(data, pointer(t_cast(c_ulong, len_out)))
         if not out:
index 7ad58788089f73cdbe4f8a89a70bd9f4d7cf189f..5533fe80f3e4657df02dc4744c1998e909b8748c 100644 (file)
@@ -70,7 +70,9 @@ class StringCaster(TypeCaster):
 class BytesAdapter(Adapter):
     def __init__(self, cls: type, conn: BaseConnection):
         super().__init__(cls, conn)
-        self.esc = Escaping(self.conn.pgconn)
+        self.esc = Escaping(
+            self.conn.pgconn if self.conn is not None else None
+        )
 
     def adapt(self, obj: bytes) -> Tuple[bytes, int]:
         return self.esc.escape_bytea(obj), BYTEA_OID
@@ -84,7 +86,7 @@ def adapt_bytes(b: bytes) -> Tuple[bytes, int]:
 @TypeCaster.text(builtins["bytea"].oid)
 @ArrayCaster.text(builtins["bytea"].array_oid)
 def cast_bytea(data: bytes) -> bytes:
-    return Escaping.unescape_bytea(data)
+    return Escaping().unescape_bytea(data)
 
 
 @TypeCaster.binary(builtins["bytea"].oid)
index 67495aa2b65e7c0db0c302e7c35d759993fba83e..01ec91722573e7f9d3a0e04480fa4e67972c59b0 100644 (file)
@@ -10,6 +10,17 @@ def test_escape_bytea(pq, pgconn, data):
     assert rv == exp
 
 
+def test_escape_noconn(pq, pgconn):
+    data = bytes(range(256))
+    esc = pq.Escaping()
+    escdata = esc.escape_bytea(data)
+    res = pgconn.exec_params(
+        b"select '%s'::bytea" % escdata, [], result_format=1
+    )
+    assert res.status == pq.ExecStatus.TUPLES_OK
+    assert res.get_value(0, 0) == data
+
+
 def test_escape_1char(pq, pgconn):
     esc = pq.Escaping(pgconn)
     for c in range(256):
@@ -23,5 +34,5 @@ def test_escape_1char(pq, pgconn):
 )
 def test_unescape_bytea(pq, pgconn, data):
     enc = br"\x" + b"".join(b"%02x" % c for c in data)
-    rv = pq.Escaping.unescape_bytea(enc)
+    rv = pq.Escaping(pgconn).unescape_bytea(enc)
     assert rv == data