It gives directly a c variable.
cdef readonly object connection
cdef pq.PGconn _pgconn
- def __init__(self, src: type, context: Optional["AdaptContext"] = None):
+ def __init__(self, src: type, context: Optional[AdaptContext] = None):
self.src = src
self.connection = context.connection if context is not None else None
self._pgconn = (
cls,
src: Union[type, str],
context: Optional[AdaptContext] = None,
- format: Format = Format.TEXT,
+ int format = Format.TEXT,
) -> None:
if context is not None:
adapters = context.adapters
cdef public libpq.Oid oid
cdef public connection
- def __init__(self, oid: int, context: Optional["AdaptContext"] = None):
+ def __init__(self, int oid, context: Optional[AdaptContext] = None):
self.oid = oid
self.connection = context.connection if context is not None else None
@classmethod
def register(
cls,
- oid: int,
+ int oid,
context: Optional["AdaptContext"] = None,
- format: Format = Format.TEXT,
+ int format = Format.TEXT,
) -> None:
if context is not None:
adapters = context.adapters
f" to format {Format(format).name}"
)
- def load_rows(self, row0: int, row1: int) -> Sequence[Tuple[Any, ...]]:
+ def load_rows(self, int row0, int row1) -> Sequence[Tuple[Any, ...]]:
if self._pgresult is None:
raise e.InterfaceError("result not set")
- cdef int crow0 = row0
- cdef int crow1 = row1
- if not (0 <= crow0 <= self._ntuples and 0 <= crow1 <= self._ntuples):
+ if not (0 <= row0 <= self._ntuples and 0 <= row1 <= self._ntuples):
raise e.InterfaceError(
f"rows must be included between 0 and {self._ntuples}"
)
cdef const char *val
cdef object record # not 'tuple' as it would check on assignment
- cdef object records = PyList_New(crow1 - crow0)
- for row in range(crow0, crow1):
+ cdef object records = PyList_New(row1 - row0)
+ for row in range(row0, row1):
record = PyTuple_New(self._nfields)
Py_INCREF(record)
- PyList_SET_ITEM(records, row - crow0, record)
+ PyList_SET_ITEM(records, row - row0, record)
cdef RowLoader loader
cdef CLoader cloader
if loader.cloader is not None:
cloader = loader.cloader
- for row in range(crow0, crow1):
- brecord = PyList_GET_ITEM(records, row - crow0)
+ for row in range(row0, row1):
+ brecord = PyList_GET_ITEM(records, row - row0)
attval = &(ires.tuples[row][col])
if attval.len == -1: # NULL_LEN
Py_INCREF(None)
else:
pyloader = loader.pyloader
- for row in range(crow0, crow1):
- brecord = PyList_GET_ITEM(records, row - crow0)
+ for row in range(row0, row1):
+ brecord = PyList_GET_ITEM(records, row - row0)
attval = &(ires.tuples[row][col])
if attval.len == -1: # NULL_LEN
Py_INCREF(None)
return records
- def load_row(self, row: int) -> Optional[Tuple[Any, ...]]:
+ def load_row(self, int row) -> Optional[Tuple[Any, ...]]:
if self._pgresult is None:
return None
- cdef int crow = row
- if not 0 <= crow < self._ntuples:
+ if not 0 <= row < self._ntuples:
return None
cdef libpq.PGresult *res = self._pgresult.pgresult_ptr
record = PyTuple_New(self._nfields)
for col in range(self._nfields):
- attval = &(ires.tuples[crow][col])
+ attval = &(ires.tuples[row][col])
if attval.len == -1: # NULL_LEN
Py_INCREF(None)
PyTuple_SET_ITEM(record, col, None)
cdef object _c_get_loader(self, libpq.Oid oid, int format):
cdef dict cache
+ cdef object pyoid = oid
+
if format == 0:
cache = self._text_loaders
else:
cache = self._binary_loaders
- if oid in cache:
- return cache[oid]
+ cdef PyObject *ptr = PyDict_GetItem(cache, pyoid)
+ if ptr != NULL:
+ return <object>ptr
- loader_cls = self.adapters._loaders.get((oid, format))
+ loader_cls = self.adapters._loaders.get((pyoid, format))
if loader_cls is None:
loader_cls = self.adapters._loaders[oids.INVALID_OID, format]
- loader = cache[oid] = loader_cls(oid, self)
+ loader = cache[pyoid] = loader_cls(pyoid, self)
return loader
from posix.unistd cimport getpid
from cpython.mem cimport PyMem_Malloc, PyMem_Free
-from cpython.bytes cimport PyBytes_AsString
+from cpython.bytes cimport PyBytes_AsString, PyBytes_AsStringAndSize
import logging
param_values: Optional[Sequence[Optional[bytes]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
- result_format: int = Format.TEXT,
+ int result_format = Format.TEXT,
) -> PGresult:
_ensure_pgconn(self)
cdef int *clengths
cdef int *cformats
cdef const char *ccommand = command
- cdef int cresformat = result_format
cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
param_values, param_types, param_formats)
with nogil:
pgresult = libpq.PQexecParams(
self.pgconn_ptr, ccommand, cnparams, ctypes,
- <const char *const *>cvalues, clengths, cformats, cresformat)
+ <const char *const *>cvalues, clengths, cformats, result_format)
_clear_query_params(ctypes, cvalues, clengths, cformats)
if pgresult is NULL:
raise MemoryError("couldn't allocate PGresult")
param_values: Optional[Sequence[Optional[bytes]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
- result_format: int = Format.TEXT,
+ int result_format = Format.TEXT,
) -> None:
_ensure_pgconn(self)
cdef int *clengths
cdef int *cformats
cdef const char *ccommand = command
- cdef int cresformat = result_format
cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
param_values, param_types, param_formats)
with nogil:
rv = libpq.PQsendQueryParams(
self.pgconn_ptr, ccommand, cnparams, ctypes,
- <const char *const *>cvalues, clengths, cformats, cresformat)
+ <const char *const *>cvalues, clengths, cformats, result_format)
_clear_query_params(ctypes, cvalues, clengths, cformats)
if not rv:
raise PQerror(
name: bytes,
param_values: Optional[Sequence[Optional[bytes]]],
param_formats: Optional[Sequence[int]] = None,
- result_format: int = Format.TEXT,
+ int result_format = Format.TEXT,
) -> None:
_ensure_pgconn(self)
cdef int *clengths
cdef int *cformats
cdef const char *cname = name
- cdef int cresformat = result_format
cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
param_values, None, param_formats)
with nogil:
rv = libpq.PQsendQueryPrepared(
self.pgconn_ptr, cname, cnparams, <const char *const *>cvalues,
- clengths, cformats, cresformat)
+ clengths, cformats, result_format)
_clear_query_params(ctypes, cvalues, clengths, cformats)
if not rv:
raise PQerror(
def exec_prepared(
self,
- name: bytes,
+ bytes name,
param_values: Optional[Sequence[bytes]],
param_formats: Optional[Sequence[int]] = None,
- result_format: int = 0,
+ int result_format = Format.TEXT,
) -> PGresult:
_ensure_pgconn(self)
param_values, None, param_formats)
cdef const char *cname = name
- cdef int cresformat = result_format
cdef libpq.PGresult *rv
with nogil:
rv = libpq.PQexecPrepared(
self.pgconn_ptr, cname, cnparams,
<const char *const *>cvalues,
- clengths, cformats, cresformat)
+ clengths, cformats, result_format)
_clear_query_params(ctypes, cvalues, clengths, cformats)
if rv is NULL:
return libpq.PQisnonblocking(self.pgconn_ptr)
@nonblocking.setter
- def nonblocking(self, arg: int) -> None:
+ def nonblocking(self, int arg) -> None:
if 0 > libpq.PQsetnonblocking(self.pgconn_ptr, arg):
raise PQerror(f"setting nonblocking failed: {error_message(self)}")
def put_copy_data(self, buffer: bytes) -> int:
cdef int rv
- cdef const char *cbuffer = PyBytes_AsString(buffer)
- cdef int length = len(buffer)
+ cdef char *cbuffer
+ cdef Py_ssize_t length
+
+ PyBytes_AsStringAndSize(buffer, &cbuffer, &length)
rv = libpq.PQputCopyData(self.pgconn_ptr, cbuffer, length)
if rv < 0:
raise PQerror(f"sending copy data failed: {error_message(self)}")
raise PQerror(f"sending copy end failed: {error_message(self)}")
return rv
- def get_copy_data(self, async_: int) -> Tuple[int, bytes]:
+ def get_copy_data(self, int async_) -> Tuple[int, bytes]:
cdef char *buffer_ptr = NULL
cdef int nbytes
nbytes = libpq.PQgetCopyData(self.pgconn_ptr, &buffer_ptr, async_)
else:
return nbytes, b""
- def make_empty_result(self, exec_status: int) -> PGresult:
+ def make_empty_result(self, int exec_status) -> PGresult:
cdef libpq.PGresult *rv = libpq.PQmakeEmptyPGresult(
- self.pgconn_ptr, exec_status)
+ self.pgconn_ptr, <libpq.ExecStatusType>exec_status)
if not rv:
raise MemoryError("couldn't allocate empty PGresult")
return PGresult._from_ptr(rv)
def nfields(self) -> int:
return libpq.PQnfields(self.pgresult_ptr)
- def fname(self, column_number: int) -> Optional[bytes]:
+ def fname(self, int column_number) -> Optional[bytes]:
cdef char *rv = libpq.PQfname(self.pgresult_ptr, column_number)
if rv is not NULL:
return rv
else:
return None
- def ftable(self, column_number: int) -> int:
+ def ftable(self, int column_number) -> int:
return libpq.PQftable(self.pgresult_ptr, column_number)
- def ftablecol(self, column_number: int) -> int:
+ def ftablecol(self, int column_number) -> int:
return libpq.PQftablecol(self.pgresult_ptr, column_number)
- def fformat(self, column_number: int) -> Format:
+ def fformat(self, int column_number) -> Format:
return Format(libpq.PQfformat(self.pgresult_ptr, column_number))
- def ftype(self, column_number: int) -> int:
+ def ftype(self, int column_number) -> int:
return libpq.PQftype(self.pgresult_ptr, column_number)
- def fmod(self, column_number: int) -> int:
+ def fmod(self, int column_number) -> int:
return libpq.PQfmod(self.pgresult_ptr, column_number)
- def fsize(self, column_number: int) -> int:
+ def fsize(self, int column_number) -> int:
return libpq.PQfsize(self.pgresult_ptr, column_number)
@property
def binary_tuples(self) -> Format:
return Format(libpq.PQbinaryTuples(self.pgresult_ptr))
- def get_value(
- self, row_number: int, column_number: int
- ) -> Optional[bytes]:
+ def get_value(self, int row_number, int column_number) -> Optional[bytes]:
cdef int crow = row_number
cdef int ccol = column_number
cdef int length = libpq.PQgetlength(self.pgresult_ptr, crow, ccol)
def nparams(self) -> int:
return libpq.PQnparams(self.pgresult_ptr)
- def param_type(self, param_number: int) -> int:
+ def param_type(self, int param_number) -> int:
return libpq.PQparamtype(self.pgresult_ptr, param_number)
@property