from typing import Any
from cpython.bytes cimport PyBytes_AsStringAndSize
+from cpython.bytearray cimport PyByteArray_FromStringAndSize, PyByteArray_Resize
+from cpython.bytearray cimport PyByteArray_AS_STRING
from psycopg3_c cimport libpq as impl
from psycopg3_c.adapt cimport cloader_func, get_context_func
-from psycopg3_c.pq_cython cimport Escaping
+from psycopg3_c.pq_cython cimport Escaping, _buffer_as_string_and_size
+from psycopg3 import errors as e
from psycopg3.pq import Format
+from psycopg3.pq.misc import error_message
import logging
logger = logging.getLogger("psycopg3.adapt")
def dump(self, obj: Any) -> bytes:
raise NotImplementedError()
- def quote(self, obj: Any) -> bytes:
- # TODO: can be optimized
- cdef object ovalue = self.dump(obj)
-
- cdef bytes value
- if isinstance(ovalue, bytes):
- value = ovalue
+ def quote(self, obj: Any) -> bytearray:
+ cdef char *ptr
+ cdef char *ptr_out
+ cdef Py_ssize_t length, len_out
+ cdef int error
+ cdef bytearray rv
+
+ pyout = self.dump(obj)
+ _buffer_as_string_and_size(pyout, &ptr, &length)
+ rv = PyByteArray_FromStringAndSize("", 0)
+ PyByteArray_Resize(rv, length * 2 + 3) # Must include the quotes
+ ptr_out = PyByteArray_AS_STRING(rv)
+
+ if self._pgconn is not None:
+ if self._pgconn.pgconn_ptr == NULL:
+ raise e.OperationalError("the connection is closed")
+
+ len_out = impl.PQescapeStringConn(
+ self._pgconn.pgconn_ptr, ptr_out + 1, ptr, length, &error
+ )
+ if error:
+ raise e.OperationalError(
+ f"escape_string failed: {error_message(self.connection)}"
+ )
else:
- value = bytes(ovalue)
+ len_out = impl.PQescapeString(ptr_out + 1, ptr, length)
- cdef bytes tmp
- cdef Escaping esc
+ ptr_out[0] = b'\''
+ ptr_out[len_out + 1] = b'\''
+ PyByteArray_Resize(rv, len_out + 2)
- if self.connection:
- esc = Escaping(self._pgconn)
- return bytes(esc.escape_literal(value))
-
- else:
- esc = Escaping()
- tmp = bytes(esc.escape_string(value))
- return b"'%s'" % tmp
+ return rv
@property
def oid(self) -> int: