--- /dev/null
+"""
+Libpq header definition for the cython psycopg3.pq implementation.
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+cdef extern from "libpq-fe.h":
+ int PQlibVersion()
+
+ # structures and types
+
+ ctypedef unsigned int Oid
+
+ ctypedef struct PGconn:
+ pass
+
+ ctypedef struct PGresult:
+ pass
+
+ ctypedef struct PQconninfoOption:
+ char *keyword
+ char *envvar
+ char *compiled
+ char *val
+ char *label
+ char *dispchar
+ int dispsize
+
+ # enums
+
+ ctypedef enum PostgresPollingStatusType:
+ PGRES_POLLING_FAILED = 0
+ PGRES_POLLING_READING
+ PGRES_POLLING_WRITING
+ PGRES_POLLING_OK
+ PGRES_POLLING_ACTIVE
+
+
+ ctypedef enum PGPing:
+ PQPING_OK
+ PQPING_REJECT
+ PQPING_NO_RESPONSE
+ PQPING_NO_ATTEMPT
+
+ ctypedef enum ConnStatusType:
+ CONNECTION_OK
+ CONNECTION_BAD
+ CONNECTION_STARTED
+ CONNECTION_MADE
+ CONNECTION_AWAITING_RESPONSE
+ CONNECTION_AUTH_OK
+ CONNECTION_SETENV
+ CONNECTION_SSL_STARTUP
+ CONNECTION_NEEDED
+ CONNECTION_CHECK_WRITABLE
+ CONNECTION_GSS_STARTUP
+ # CONNECTION_CHECK_TARGET PG 12
+
+ ctypedef enum PGTransactionStatusType:
+ PQTRANS_IDLE
+ PQTRANS_ACTIVE
+ PQTRANS_INTRANS
+ PQTRANS_INERROR
+ PQTRANS_UNKNOWN
+
+ ctypedef enum ExecStatusType:
+ PGRES_EMPTY_QUERY = 0
+ PGRES_COMMAND_OK
+ PGRES_TUPLES_OK
+ PGRES_COPY_OUT
+ PGRES_COPY_IN
+ PGRES_BAD_RESPONSE
+ PGRES_NONFATAL_ERROR
+ PGRES_FATAL_ERROR
+ PGRES_COPY_BOTH
+ PGRES_SINGLE_TUPLE
+
+ # 33.1. Database Connection Control Functions
+ PGconn *PQconnectdb(const char *conninfo)
+ PGconn *PQconnectStart(const char *conninfo)
+ PostgresPollingStatusType PQconnectPoll(PGconn *conn)
+ PQconninfoOption *PQconndefaults()
+ PQconninfoOption *PQconninfo(PGconn *conn)
+ PQconninfoOption *PQconninfoParse(const char *conninfo, char **errmsg)
+ void PQfinish(PGconn *conn)
+ void PQreset(PGconn *conn)
+ int PQresetStart(PGconn *conn)
+ PostgresPollingStatusType PQresetPoll(PGconn *conn)
+ PGPing PQping(const char *conninfo)
+
+ # 33.2. Connection Status Functions
+ char *PQdb(const PGconn *conn)
+ char *PQuser(const PGconn *conn)
+ char *PQpass(const PGconn *conn)
+ char *PQhost(const PGconn *conn)
+ # char *PQhostaddr(const PGconn *conn) TODO: conditional, only libpq>=12
+ char *PQport(const PGconn *conn)
+ char *PQtty(const PGconn *conn)
+ char *PQoptions(const PGconn *conn)
+ ConnStatusType PQstatus(const PGconn *conn)
+ PGTransactionStatusType PQtransactionStatus(const PGconn *conn)
+ const char *PQparameterStatus(const PGconn *conn, const char *paramName)
+ int PQprotocolVersion(const PGconn *conn)
+ int PQserverVersion(const PGconn *conn)
+ char *PQerrorMessage(const PGconn *conn)
+ int PQsocket(const PGconn *conn)
+ int PQbackendPID(const PGconn *conn)
+ int PQconnectionNeedsPassword(const PGconn *conn)
+ int PQconnectionUsedPassword(const PGconn *conn)
+ int PQsslInUse(PGconn *conn) # TODO: const in PG 12 docs - verify/report
+ # TODO: PQsslAttribute, PQsslAttributeNames, PQsslStruct, PQgetssl
+
+ # 33.3. Command Execution Functions
+ PGresult *PQexec(PGconn *conn, const char *command)
+ PGresult *PQexecParams(PGconn *conn,
+ const char *command,
+ int nParams,
+ const Oid *paramTypes,
+ const char * const *paramValues,
+ const int *paramLengths,
+ const int *paramFormats,
+ int resultFormat)
+ PGresult *PQprepare(PGconn *conn,
+ const char *stmtName,
+ const char *query,
+ int nParams,
+ const Oid *paramTypes)
+ PGresult *PQexecPrepared(PGconn *conn,
+ const char *stmtName,
+ int nParams,
+ const char * const *paramValues,
+ const int *paramLengths,
+ const int *paramFormats,
+ int resultFormat)
+ PGresult *PQdescribePrepared(PGconn *conn, const char *stmtName)
+ PGresult *PQdescribePortal(PGconn *conn, const char *portalName)
+ ExecStatusType PQresultStatus(const PGresult *res)
+ # PQresStatus: not needed, we have pretty enums
+ char *PQresultErrorMessage(const PGresult *res)
+ # TODO: PQresultVerboseErrorMessage
+ char *PQresultErrorField(const PGresult *res, int fieldcode)
+ void PQclear(PGresult *res)
+
+ # 33.3.2. Retrieving Query Result Information
+ int PQntuples(const PGresult *res)
+ int PQnfields(const PGresult *res)
+ char *PQfname(const PGresult *res, int column_number)
+ int PQfnumber(const PGresult *res, const char *column_name)
+ Oid PQftable(const PGresult *res, int column_number)
+ int PQftablecol(const PGresult *res, int column_number)
+ int PQfformat(const PGresult *res, int column_number)
+ Oid PQftype(const PGresult *res, int column_number)
+ int PQfmod(const PGresult *res, int column_number)
+ int PQfsize(const PGresult *res, int column_number)
+ int PQbinaryTuples(const PGresult *res)
+ char *PQgetvalue(const PGresult *res, int row_number, int column_number)
+ int PQgetisnull(const PGresult *res, int row_number, int column_number)
+ int PQgetlength(const PGresult *res, int row_number, int column_number)
+ int PQnparams(const PGresult *res)
+ Oid PQparamtype(const PGresult *res, int param_number)
+ # PQprint: pretty useless
+
+ # 33.3.3. Retrieving Other Result Information
+ char *PQcmdStatus(PGresult *res)
+ char *PQcmdTuples(PGresult *res)
+ Oid PQoidValue(const PGresult *res)
+
+ # 33.3.4. Escaping Strings for Inclusion in SQL Commands
+ # TODO: PQescapeLiteral PQescapeIdentifier PQescapeStringConn PQescapeString
+ unsigned char *PQescapeByteaConn(PGconn *conn,
+ const unsigned char *src,
+ size_t from_length,
+ size_t *to_length)
+ unsigned char *PQescapeBytea(const unsigned char *src,
+ size_t from_length,
+ size_t *to_length)
+ unsigned char *PQunescapeBytea(const unsigned char *src, size_t *to_length)
+
+
+ # 33.4. Asynchronous Command Processing
+ int PQsendQuery(PGconn *conn, const char *command)
+ int PQsendQueryParams(PGconn *conn,
+ const char *command,
+ int nParams,
+ const Oid *paramTypes,
+ const char * const *paramValues,
+ const int *paramLengths,
+ const int *paramFormats,
+ int resultFormat)
+ int PQsendPrepare(PGconn *conn,
+ const char *stmtName,
+ const char *query,
+ int nParams,
+ const Oid *paramTypes)
+ int PQsendQueryPrepared(PGconn *conn,
+ const char *stmtName,
+ int nParams,
+ const char * const *paramValues,
+ const int *paramLengths,
+ const int *paramFormats,
+ int resultFormat)
+ int PQsendDescribePrepared(PGconn *conn, const char *stmtName)
+ int PQsendDescribePortal(PGconn *conn, const char *portalName)
+ PGresult *PQgetResult(PGconn *conn)
+ int PQconsumeInput(PGconn *conn)
+ int PQisBusy(PGconn *conn)
+ int PQsetnonblocking(PGconn *conn, int arg)
+ int PQisnonblocking(const PGconn *conn)
+ int PQflush(PGconn *conn)
+
+ # 33.11. Miscellaneous Functions
+ void PQfreemem(void *ptr)
+ void PQconninfoFree(PQconninfoOption *connOptions)
+ PGresult *PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
+ int PQlibVersion()
+
--- /dev/null
+"""
+libpq Python wrapper using cython bindings.
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+from cpython.mem cimport PyMem_Malloc, PyMem_Free
+
+from psycopg3.pq cimport libpq as impl
+from psycopg3.errors import OperationalError
+
+from .misc import error_message, ConninfoOption
+from .enums import (
+ ConnStatus,
+ PollingStatus,
+ ExecStatus,
+ TransactionStatus,
+ Ping,
+ DiagnosticField,
+ Format,
+)
+
+
+__impl__ = 'c'
+
+
+class PQerror(OperationalError):
+ pass
+
+
+def version():
+ return impl.PQlibVersion()
+
+ctypedef unsigned int Oid;
+ctypedef char *(*conn_bytes_f) (const impl.PGconn *)
+ctypedef int(*conn_int_f) (const impl.PGconn *)
+
+cdef class PGconn:
+ cdef impl.PGconn* pgconn_ptr
+
+ @staticmethod
+ cdef PGconn _from_ptr(impl.PGconn *ptr):
+ cdef PGconn rv = PGconn.__new__(PGconn)
+ rv.pgconn_ptr = ptr
+ return rv
+
+ def __cinit__(self):
+ self.pgconn_ptr = NULL
+
+ def __dealloc__(self):
+ self.finish()
+
+ @classmethod
+ def connect(cls, conninfo: bytes) -> PGconn:
+ return _connect(conninfo)
+
+ @classmethod
+ def connect_start(cls, conninfo: bytes) -> PGconn:
+ return _connect_start(conninfo)
+
+ def connect_poll(self) -> PollingStatus:
+ cdef int rv = self._call_int(<conn_int_f>impl.PQconnectPoll)
+ return PollingStatus(rv)
+
+ def finish(self) -> None:
+ if self.pgconn_ptr is not NULL:
+ impl.PQfinish(self.pgconn_ptr)
+ self.pgconn_ptr = NULL
+
+ @property
+ def info(self) -> List["ConninfoOption"]:
+ self._ensure_pgconn()
+ cdef impl.PQconninfoOption *opts = impl.PQconninfo(self.pgconn_ptr)
+ if opts is NULL:
+ raise MemoryError("couldn't allocate connection info")
+ try:
+ return _options_from_array(opts)
+ finally:
+ impl.PQconninfoFree(opts)
+
+ def reset(self) -> None:
+ self._ensure_pgconn()
+ impl.PQreset(self.pgconn_ptr)
+
+ def reset_start(self) -> None:
+ if not impl.PQresetStart(self.pgconn_ptr):
+ raise PQerror("couldn't reset connection")
+
+ def reset_poll(self) -> PollingStatus:
+ cdef int rv = self._call_int(<conn_int_f>impl.PQresetPoll)
+ return PollingStatus(rv)
+
+ @classmethod
+ def ping(self, conninfo: bytes) -> Ping:
+ cdef int rv = impl.PQping(conninfo)
+ return Ping(rv)
+
+ @property
+ def db(self) -> bytes:
+ return self._call_bytes(impl.PQdb)
+
+ @property
+ def user(self) -> bytes:
+ return self._call_bytes(impl.PQuser)
+
+ @property
+ def password(self) -> bytes:
+ return self._call_bytes(impl.PQpass)
+
+ @property
+ def host(self) -> bytes:
+ return self._call_bytes(impl.PQhost)
+
+ # @property
+ # def hostaddr(self) -> bytes:
+ # return self._call_bytes(impl.PQhostaddr)
+
+ @property
+ def port(self) -> bytes:
+ return self._call_bytes(impl.PQport)
+
+ @property
+ def tty(self) -> bytes:
+ return self._call_bytes(impl.PQtty)
+
+ @property
+ def options(self) -> bytes:
+ return self._call_bytes(impl.PQoptions)
+
+ @property
+ def status(self) -> ConnStatus:
+ cdef int rv = impl.PQstatus(self.pgconn_ptr)
+ return ConnStatus(rv)
+
+ @property
+ def transaction_status(self) -> TransactionStatus:
+ cdef int rv = impl.PQtransactionStatus(self.pgconn_ptr)
+ return TransactionStatus(rv)
+
+ def parameter_status(self, name: bytes) -> Optional[bytes]:
+ self._ensure_pgconn()
+ cdef const char *rv = impl.PQparameterStatus(self.pgconn_ptr, name)
+ if rv is not NULL:
+ return rv
+ else:
+ return None
+
+ @property
+ def error_message(self) -> bytes:
+ return impl.PQerrorMessage(self.pgconn_ptr)
+
+ @property
+ def protocol_version(self) -> int:
+ return self._call_int(impl.PQprotocolVersion)
+
+ @property
+ def server_version(self) -> int:
+ return self._call_int(impl.PQserverVersion)
+
+ @property
+ def socket(self) -> int:
+ return self._call_int(impl.PQsocket)
+
+ @property
+ def backend_pid(self) -> int:
+ return self._call_int(impl.PQbackendPID)
+
+ @property
+ def needs_password(self) -> bool:
+ return bool(self._call_int(impl.PQconnectionNeedsPassword))
+
+ @property
+ def used_password(self) -> bool:
+ return bool(self._call_int(impl.PQconnectionUsedPassword))
+
+ @property
+ def ssl_in_use(self) -> bool:
+ return bool(self._call_int(<conn_int_f>impl.PQsslInUse))
+
+ def exec_(self, command: bytes) -> PGresult:
+ self._ensure_pgconn()
+ cdef impl.PGresult *pgresult = impl.PQexec(self.pgconn_ptr, command)
+ if pgresult is NULL:
+ raise MemoryError("couldn't allocate PGresult")
+
+ return PGresult._from_ptr(pgresult)
+
+ def send_query(self, command: bytes) -> None:
+ self._ensure_pgconn()
+ if not impl.PQsendQuery(self.pgconn_ptr, command):
+ raise PQerror(
+ "sending query failed:"
+ f" {error_message(self)}"
+ )
+
+ def exec_params(
+ self,
+ command: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_types: Optional[Sequence[int]] = None,
+ param_formats: Optional[Sequence[Format]] = None,
+ result_format: Format = Format.TEXT,
+ ) -> PGresult:
+ self._ensure_pgconn()
+
+ cdef int cnparams
+ cdef Oid *ctypes
+ cdef char *const *cvalues
+ cdef int *clenghts
+ cdef int *cformats
+ cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
+ param_values, param_types, param_formats)
+
+ cdef impl.PGresult *pgresult = impl.PQexecParams(
+ self.pgconn_ptr, command, cnparams, ctypes,
+ <const char *const *>cvalues, clengths, cformats, result_format)
+ _clear_query_params(ctypes, cvalues, clengths, cformats)
+ if pgresult is NULL:
+ raise MemoryError("couldn't allocate PGresult")
+ return PGresult._from_ptr(pgresult)
+
+ def send_query_params(
+ self,
+ command: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_types: Optional[Sequence[int]] = None,
+ param_formats: Optional[Sequence[Format]] = None,
+ result_format: Format = Format.TEXT,
+ ) -> None:
+ self._ensure_pgconn()
+
+ cdef int cnparams
+ cdef Oid *ctypes
+ cdef char *const *cvalues
+ cdef int *clenghts
+ cdef int *cformats
+ cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
+ param_values, param_types, param_formats)
+
+ cdef int rv = impl.PQsendQueryParams(
+ self.pgconn_ptr, command, cnparams, ctypes,
+ <const char *const *>cvalues,
+ clengths, cformats, result_format)
+ _clear_query_params(ctypes, cvalues, clengths, cformats)
+ if not rv:
+ raise PQerror(
+ "sending query and params failed:"
+ f" {error_message(self)}"
+ )
+
+ def send_prepare(
+ self,
+ name: bytes,
+ command: bytes,
+ param_types: Optional[Sequence[int]] = None,
+ ) -> None:
+ self._ensure_pgconn()
+
+ cdef int i
+ cdef int nparams = len(param_types) if param_types else 0
+ cdef Oid *atypes = NULL
+ if nparams:
+ atypes = <Oid *>PyMem_Malloc(nparams * sizeof(Oid))
+ for i in range(nparams):
+ atypes[i] = param_types[i]
+
+ cdef int rv = impl.PQsendPrepare(
+ self.pgconn_ptr, name, command, nparams, atypes
+ )
+ PyMem_Free(atypes)
+ if not rv:
+ raise PQerror(
+ "sending query and params failed:"
+ f" {error_message(self)}"
+ )
+
+ def send_query_prepared(
+ self,
+ name: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_formats: Optional[Sequence[Format]] = None,
+ result_format: Format = Format.TEXT,
+ ) -> None:
+ self._ensure_pgconn()
+
+ cdef int cnparams
+ cdef Oid *ctypes
+ cdef char *const *cvalues
+ cdef int *clenghts
+ cdef int *cformats
+ cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
+ param_values, None, param_formats)
+
+ cdef int rv = impl.PQsendQueryPrepared(
+ self.pgconn_ptr, name, cnparams,
+ <const char *const *>cvalues,
+ clengths, cformats, result_format)
+ _clear_query_params(ctypes, cvalues, clengths, cformats)
+ if not rv:
+ raise PQerror(
+ "sending prepared query failed:"
+ f" {error_message(self)}"
+ )
+
+ def prepare(
+ self,
+ name: bytes,
+ command: bytes,
+ param_types: Optional[Sequence[int]] = None,
+ ) -> PGresult:
+ self._ensure_pgconn()
+
+ cdef int i
+ cdef int nparams = len(param_types) if param_types else 0
+ cdef Oid *atypes = NULL
+ if nparams:
+ atypes = <Oid *>PyMem_Malloc(nparams * sizeof(Oid))
+ for i in range(nparams):
+ atypes[i] = param_types[i]
+
+ cdef impl.PGresult *rv = impl.PQprepare(
+ self.pgconn_ptr, name, command, nparams, atypes)
+ PyMem_Free(atypes)
+ if rv is NULL:
+ raise MemoryError("couldn't allocate PGresult")
+ return PGresult._from_ptr(rv)
+
+ def exec_prepared(
+ self,
+ name: bytes,
+ param_values: Optional[Sequence[bytes]],
+ param_formats: Optional[Sequence[int]] = None,
+ result_format: int = 0,
+ ) -> PGresult:
+ self._ensure_pgconn()
+
+ cdef int cnparams
+ cdef Oid *ctypes
+ cdef char *const *cvalues
+ cdef int *clenghts
+ cdef int *cformats
+ cnparams, ctypes, cvalues, clengths, cformats = _query_params_args(
+ param_values, None, param_formats)
+
+ cdef impl.PGresult *rv = impl.PQexecPrepared(
+ self.pgconn_ptr, name, cnparams,
+ <const char *const *>cvalues,
+ clengths, cformats, result_format)
+
+ _clear_query_params(ctypes, cvalues, clengths, cformats)
+ if rv is NULL:
+ raise MemoryError("couldn't allocate PGresult")
+ return PGresult._from_ptr(rv)
+
+ def describe_prepared(self, name: bytes) -> PGresult:
+ self._ensure_pgconn()
+ cdef impl.PGresult *rv = impl.PQdescribePrepared(self.pgconn_ptr, name)
+ if rv is NULL:
+ raise MemoryError("couldn't allocate PGresult")
+ return PGresult._from_ptr(rv)
+
+ def describe_portal(self, name: bytes) -> PGresult:
+ self._ensure_pgconn()
+ cdef impl.PGresult *rv = impl.PQdescribePortal(self.pgconn_ptr, name)
+ if rv is NULL:
+ raise MemoryError("couldn't allocate PGresult")
+ return PGresult._from_ptr(rv)
+
+ def get_result(self) -> Optional["PGresult"]:
+ cdef impl.PGresult *pgresult = impl.PQgetResult(self.pgconn_ptr)
+ if pgresult is NULL:
+ return None
+ return PGresult._from_ptr(pgresult)
+
+ def consume_input(self) -> None:
+ if 1 != impl.PQconsumeInput(self.pgconn_ptr):
+ raise PQerror(
+ "consuming input failed:"
+ f" {error_message(self)}"
+ )
+
+ def is_busy(self) -> int:
+ return impl.PQisBusy(self.pgconn_ptr)
+
+ @property
+ def nonblocking(self) -> int:
+ return impl.PQisnonblocking(self.pgconn_ptr)
+
+ @nonblocking.setter
+ def nonblocking(self, arg: int) -> None:
+ if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg):
+ raise PQerror(
+ f"setting nonblocking failed:"
+ f" {error_message(self)}"
+ )
+
+ def flush(self) -> int:
+ cdef int rv = impl.PQflush(self.pgconn_ptr)
+ if rv < 0:
+ raise PQerror(
+ f"flushing failed:{error_message(self)}"
+ )
+ return rv
+
+ def make_empty_result(self, exec_status: ExecStatus) -> PGresult:
+ cdef impl.PGresult *rv = impl.PQmakeEmptyPGresult(
+ self.pgconn_ptr, exec_status)
+ if not rv:
+ raise MemoryError("couldn't allocate empty PGresult")
+ return PGresult._from_ptr(rv)
+
+ cdef int _ensure_pgconn(self) except 0:
+ if self.pgconn_ptr is not NULL:
+ return 1
+
+ raise PQerror("the connection is closed")
+
+ cdef char *_call_bytes(self, conn_bytes_f func) except NULL:
+ """
+ Call one of the pgconn libpq functions returning a bytes pointer.
+ """
+ if not self._ensure_pgconn():
+ return NULL
+ cdef char *rv = func(self.pgconn_ptr)
+ assert rv is not NULL
+ return rv
+
+ cdef int _call_int(self, conn_int_f func) except -1:
+ """
+ Call one of the pgconn libpq functions returning an int.
+ """
+ if not self._ensure_pgconn():
+ return -1
+ return func(self.pgconn_ptr)
+
+
+cdef PGconn _connect(const char *conninfo):
+ cdef impl.PGconn* pgconn = impl.PQconnectdb(conninfo)
+ if not pgconn:
+ raise MemoryError("couldn't allocate PGconn")
+
+ return PGconn._from_ptr(pgconn)
+
+
+cdef PGconn _connect_start(const char *conninfo):
+ cdef impl.PGconn* pgconn = impl.PQconnectStart(conninfo)
+ if not pgconn:
+ raise MemoryError("couldn't allocate PGconn")
+
+ return PGconn._from_ptr(pgconn)
+
+
+cdef (int, Oid *, char * const*, int *, int *) _query_params_args(
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_types: Optional[Sequence[int]],
+ param_formats: Optional[Sequence[Format]],
+) except *:
+ cdef int i
+
+ cdef int nparams = len(param_values) if param_values else 0
+ if param_types is not None and len(param_types) != nparams:
+ raise ValueError(
+ "got %d param_values but %d param_types"
+ % (nparams, len(param_types))
+ )
+ if param_formats is not None and len(param_formats) != nparams:
+ raise ValueError(
+ "got %d param_values but %d param_formats"
+ % (nparams, len(param_formats))
+ )
+
+ cdef char **aparams = NULL
+ cdef int *alenghts = NULL
+ if nparams:
+ aparams = <char **>PyMem_Malloc(nparams * sizeof(char *))
+ alenghts = <int *>PyMem_Malloc(nparams * sizeof(int))
+ for i in range(nparams):
+ if param_values[i] is not None:
+ aparams[i] = param_values[i]
+ alenghts[i] = len(param_values[i])
+ else:
+ aparams[i] = NULL
+ alenghts[i] = 0
+
+ cdef Oid *atypes = NULL
+ if param_types is not None:
+ atypes = <Oid *>PyMem_Malloc(nparams * sizeof(Oid))
+ for i in range(nparams):
+ atypes[i] = param_types[i]
+
+ cdef int *aformats = NULL
+ if param_formats is not None:
+ aformats = <int *>PyMem_Malloc(nparams * sizeof(int *))
+ for i in range(nparams):
+ aformats[i] = param_formats[i]
+
+ return (nparams, atypes, aparams, alenghts, aformats)
+
+
+cdef void _clear_query_params(
+ Oid *ctypes, char *const *cvalues, int *clenghts, int *cformats
+):
+ PyMem_Free(ctypes)
+ PyMem_Free(<char **>cvalues)
+ PyMem_Free(clenghts)
+ PyMem_Free(cformats)
+
+
+cdef _options_from_array(impl.PQconninfoOption *opts):
+ rv = []
+ cdef int i = 0
+ cdef impl.PQconninfoOption* opt
+ while 1:
+ opt = opts + i
+ if opt.keyword is NULL:
+ break
+ rv.append(
+ ConninfoOption(
+ keyword=opt.keyword,
+ envvar=opt.envvar if opt.envvar is not NULL else None,
+ compiled=opt.compiled if opt.compiled is not NULL else None,
+ val=opt.val if opt.val is not NULL else None,
+ label=opt.label if opt.label is not NULL else None,
+ dispchar=opt.dispchar if opt.dispchar is not NULL else None,
+ dispsize=opt.dispsize,
+ )
+ )
+ i += 1
+
+ return rv
+
+
+cdef class PGresult:
+ cdef impl.PGresult* pgresult_ptr
+
+ def __cinit__(self):
+ self.pgresult_ptr = NULL
+
+ @staticmethod
+ cdef PGresult _from_ptr(impl.PGresult *ptr):
+ cdef PGresult rv = PGresult.__new__(PGresult)
+ rv.pgresult_ptr = ptr
+ return rv
+
+ def __dealloc__(self) -> None:
+ self.clear()
+
+ def clear(self) -> None:
+ if self.pgresult_ptr is not NULL:
+ impl.PQclear(self.pgresult_ptr)
+ self.pgresult_ptr = NULL
+
+ @property
+ def status(self) -> ExecStatus:
+ cdef int rv = impl.PQresultStatus(self.pgresult_ptr)
+ return ExecStatus(rv)
+
+ @property
+ def error_message(self) -> bytes:
+ return impl.PQresultErrorMessage(self.pgresult_ptr)
+
+ def error_field(self, fieldcode: DiagnosticField) -> Optional[bytes]:
+ cdef char * rv = impl.PQresultErrorField(self.pgresult_ptr, fieldcode)
+ if rv is not NULL:
+ return rv
+ else:
+ return None
+
+ @property
+ def ntuples(self) -> int:
+ return impl.PQntuples(self.pgresult_ptr)
+
+ @property
+ def nfields(self) -> int:
+ return impl.PQnfields(self.pgresult_ptr)
+
+ def fname(self, column_number: int) -> Optional[bytes]:
+ cdef char *rv = impl.PQfname(self.pgresult_ptr, column_number)
+ if rv is not NULL:
+ return rv
+ else:
+ return None
+
+ def ftable(self, column_number: int) -> int:
+ return impl.PQftable(self.pgresult_ptr, column_number)
+
+ def ftablecol(self, column_number: int) -> int:
+ return impl.PQftablecol(self.pgresult_ptr, column_number)
+
+ def fformat(self, column_number: int) -> Format:
+ return Format(impl.PQfformat(self.pgresult_ptr, column_number))
+
+ def ftype(self, column_number: int) -> int:
+ return impl.PQftype(self.pgresult_ptr, column_number)
+
+ def fmod(self, column_number: int) -> int:
+ return impl.PQfmod(self.pgresult_ptr, column_number)
+
+ def fsize(self, column_number: int) -> int:
+ return impl.PQfsize(self.pgresult_ptr, column_number)
+
+ @property
+ def binary_tuples(self) -> Format:
+ return Format(impl.PQbinaryTuples(self.pgresult_ptr))
+
+ def get_value(
+ self, row_number: int, column_number: int
+ ) -> Optional[bytes]:
+ cdef int length = impl.PQgetlength(
+ self.pgresult_ptr, row_number, column_number
+ )
+ cdef char *v;
+ if length:
+ v = impl.PQgetvalue(self.pgresult_ptr, row_number, column_number)
+ # TODO: avoid copy
+ return v[:length]
+ else:
+ if impl.PQgetisnull(self.pgresult_ptr, row_number, column_number):
+ return None
+ else:
+ return b""
+
+ @property
+ def nparams(self) -> int:
+ return impl.PQnparams(self.pgresult_ptr)
+
+ def param_type(self, param_number: int) -> int:
+ return impl.PQparamtype(self.pgresult_ptr, param_number)
+
+ @property
+ def command_status(self) -> Optional[bytes]:
+ cdef char *rv = impl.PQcmdStatus(self.pgresult_ptr)
+ if rv is not NULL:
+ return rv
+ else:
+ return None
+
+ @property
+ def command_tuples(self) -> Optional[int]:
+ cdef char *rv = impl.PQcmdTuples(self.pgresult_ptr)
+ if rv is NULL:
+ return None
+ cdef bytes brv = rv
+ return int(brv) if brv else None
+
+ @property
+ def oid_value(self) -> int:
+ return impl.PQoidValue(self.pgresult_ptr)
+
+
+class Conninfo:
+ @classmethod
+ def get_defaults(cls) -> List[ConninfoOption]:
+ cdef impl.PQconninfoOption *opts = impl.PQconndefaults()
+ if opts is NULL :
+ raise MemoryError("couldn't allocate connection defaults")
+ try:
+ return _options_from_array(opts)
+ finally:
+ impl.PQconninfoFree(opts)
+
+ @classmethod
+ def parse(cls, conninfo: bytes) -> List[ConninfoOption]:
+ cdef char *errmsg = NULL
+ cdef impl.PQconninfoOption *rv = impl.PQconninfoParse(conninfo, &errmsg)
+ if rv is NULL:
+ if errmsg is NULL:
+ raise MemoryError("couldn't allocate on conninfo parse")
+ else:
+ exc = PQerror(errmsg.decode("utf8", "replace"))
+ impl.PQfreemem(errmsg)
+ raise exc
+
+ try:
+ return _options_from_array(rv)
+ finally:
+ impl.PQconninfoFree(rv)
+
+ def __repr__(self):
+ return f"<{type(self).__name__} ({self.keyword.decode('ascii')})>"
+
+
+cdef class Escaping:
+ cdef PGconn conn
+
+ def __init__(self, conn: Optional[PGconn] = None):
+ self.conn = conn
+
+ def escape_bytea(self, data: bytes) -> bytes:
+ cdef size_t len_out
+ cdef unsigned char *out
+ if self.conn is not None:
+ if self.conn.pgconn_ptr is NULL:
+ raise PQerror("the connection is closed")
+ out = impl.PQescapeByteaConn(
+ self.conn.pgconn_ptr, data, len(data), &len_out)
+ else:
+ out = impl.PQescapeBytea(data, len(data), &len_out)
+ if out is NULL:
+ raise MemoryError(
+ f"couldn't allocate for escape_bytea of {len(data)} bytes"
+ )
+
+ # TODO: without copy?
+ rv = out[:len_out - 1] # out includes final 0
+ impl.PQfreemem(out)
+ return rv
+
+ def unescape_bytea(self, data: bytes) -> bytes:
+ # not needed, but let's keep it symmetric with the escaping:
+ # if a connection is passed in, it must be valid.
+ if self.conn is not None:
+ if self.conn.pgconn_ptr is NULL:
+ raise PQerror("the connection is closed")
+
+ cdef size_t len_out
+ cdef unsigned char *out = impl.PQunescapeBytea(data, &len_out)
+ if out is NULL:
+ raise MemoryError(
+ f"couldn't allocate for unescape_bytea of {len(data)} bytes"
+ )
+
+ rv = out[:len_out]
+ impl.PQfreemem(out)
+ return rv
+