From: Daniele Varrazzo Date: Thu, 16 Apr 2020 14:04:23 +0000 (+1200) Subject: Added conforming psycopg3.pq implementation in cython X-Git-Tag: 3.0.dev0~554 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=6c6cbe303c55156eed6e2738df18dbc52e4aeb60;p=thirdparty%2Fpsycopg.git Added conforming psycopg3.pq implementation in cython --- diff --git a/psycopg3/pq/.gitignore b/psycopg3/pq/.gitignore new file mode 100644 index 000000000..8b6838605 --- /dev/null +++ b/psycopg3/pq/.gitignore @@ -0,0 +1,3 @@ +pq_cython.c +pq_cython.cpython-36m-x86_64-linux-gnu.so +pq_cython.html diff --git a/psycopg3/pq/__init__.py b/psycopg3/pq/__init__.py index c7816b925..c5d7a714d 100644 --- a/psycopg3/pq/__init__.py +++ b/psycopg3/pq/__init__.py @@ -9,6 +9,8 @@ implementation-dependant but all the implementations share the same interface. # Copyright (C) 2020 The Psycopg Team +import os +import logging from types import ModuleType from .enums import ( @@ -23,18 +25,50 @@ from .enums import ( from .encodings import py_codecs from .misc import error_message, ConninfoOption +logger = logging.getLogger(__name__) + def import_libpq() -> ModuleType: """ Find the best libpw wrapper available. """ - from . import pq_ctypes + impl = os.environ.get("PSYCOPG3_IMPL", "").lower() + + if not impl or impl == "c": + try: + from . import pq_cython + except Exception as e: + if not impl: + logger.debug(f"C pq wrapper not available: %s", e) + else: + raise ImportError( + f"requested pq implementation '{impl}' not available" + ) from e + else: + return pq_cython + + if not impl or impl == "ctypes": + try: + from . import pq_ctypes + except Exception as e: + if not impl: + logger.debug(f"ctypes pq wrapper not available: %s", e) + else: + raise ImportError( + f"requested pq implementation '{impl}' not available" + ) from e + else: + return pq_ctypes - return pq_ctypes + if impl: + raise ImportError(f"requested pq impementation '{impl}' unknown") + else: + raise ImportError(f"no pq wrapper available") pq_module = import_libpq() +__impl__ = pq_module.__impl__ version = pq_module.version PGconn = pq_module.PGconn PGresult = pq_module.PGresult diff --git a/psycopg3/pq/libpq.pxd b/psycopg3/pq/libpq.pxd new file mode 100644 index 000000000..9cf5b71c2 --- /dev/null +++ b/psycopg3/pq/libpq.pxd @@ -0,0 +1,216 @@ +""" +Libpq header definition for the cython psycopg3.pq implementation. +""" + +# Copyright (C) 2020 The Psycopg Team + +cdef extern from "libpq-fe.h": + int PQlibVersion() + + # structures and types + + ctypedef unsigned int Oid + + ctypedef struct PGconn: + pass + + ctypedef struct PGresult: + pass + + ctypedef struct PQconninfoOption: + char *keyword + char *envvar + char *compiled + char *val + char *label + char *dispchar + int dispsize + + # enums + + ctypedef enum PostgresPollingStatusType: + PGRES_POLLING_FAILED = 0 + PGRES_POLLING_READING + PGRES_POLLING_WRITING + PGRES_POLLING_OK + PGRES_POLLING_ACTIVE + + + ctypedef enum PGPing: + PQPING_OK + PQPING_REJECT + PQPING_NO_RESPONSE + PQPING_NO_ATTEMPT + + ctypedef enum ConnStatusType: + CONNECTION_OK + CONNECTION_BAD + CONNECTION_STARTED + CONNECTION_MADE + CONNECTION_AWAITING_RESPONSE + CONNECTION_AUTH_OK + CONNECTION_SETENV + CONNECTION_SSL_STARTUP + CONNECTION_NEEDED + CONNECTION_CHECK_WRITABLE + CONNECTION_GSS_STARTUP + # CONNECTION_CHECK_TARGET PG 12 + + ctypedef enum PGTransactionStatusType: + PQTRANS_IDLE + PQTRANS_ACTIVE + PQTRANS_INTRANS + PQTRANS_INERROR + PQTRANS_UNKNOWN + + ctypedef enum ExecStatusType: + PGRES_EMPTY_QUERY = 0 + PGRES_COMMAND_OK + PGRES_TUPLES_OK + PGRES_COPY_OUT + PGRES_COPY_IN + PGRES_BAD_RESPONSE + PGRES_NONFATAL_ERROR + PGRES_FATAL_ERROR + PGRES_COPY_BOTH + PGRES_SINGLE_TUPLE + + # 33.1. Database Connection Control Functions + PGconn *PQconnectdb(const char *conninfo) + PGconn *PQconnectStart(const char *conninfo) + PostgresPollingStatusType PQconnectPoll(PGconn *conn) + PQconninfoOption *PQconndefaults() + PQconninfoOption *PQconninfo(PGconn *conn) + PQconninfoOption *PQconninfoParse(const char *conninfo, char **errmsg) + void PQfinish(PGconn *conn) + void PQreset(PGconn *conn) + int PQresetStart(PGconn *conn) + PostgresPollingStatusType PQresetPoll(PGconn *conn) + PGPing PQping(const char *conninfo) + + # 33.2. Connection Status Functions + char *PQdb(const PGconn *conn) + char *PQuser(const PGconn *conn) + char *PQpass(const PGconn *conn) + char *PQhost(const PGconn *conn) + # char *PQhostaddr(const PGconn *conn) TODO: conditional, only libpq>=12 + char *PQport(const PGconn *conn) + char *PQtty(const PGconn *conn) + char *PQoptions(const PGconn *conn) + ConnStatusType PQstatus(const PGconn *conn) + PGTransactionStatusType PQtransactionStatus(const PGconn *conn) + const char *PQparameterStatus(const PGconn *conn, const char *paramName) + int PQprotocolVersion(const PGconn *conn) + int PQserverVersion(const PGconn *conn) + char *PQerrorMessage(const PGconn *conn) + int PQsocket(const PGconn *conn) + int PQbackendPID(const PGconn *conn) + int PQconnectionNeedsPassword(const PGconn *conn) + int PQconnectionUsedPassword(const PGconn *conn) + int PQsslInUse(PGconn *conn) # TODO: const in PG 12 docs - verify/report + # TODO: PQsslAttribute, PQsslAttributeNames, PQsslStruct, PQgetssl + + # 33.3. Command Execution Functions + PGresult *PQexec(PGconn *conn, const char *command) + PGresult *PQexecParams(PGconn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char * const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat) + PGresult *PQprepare(PGconn *conn, + const char *stmtName, + const char *query, + int nParams, + const Oid *paramTypes) + PGresult *PQexecPrepared(PGconn *conn, + const char *stmtName, + int nParams, + const char * const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat) + PGresult *PQdescribePrepared(PGconn *conn, const char *stmtName) + PGresult *PQdescribePortal(PGconn *conn, const char *portalName) + ExecStatusType PQresultStatus(const PGresult *res) + # PQresStatus: not needed, we have pretty enums + char *PQresultErrorMessage(const PGresult *res) + # TODO: PQresultVerboseErrorMessage + char *PQresultErrorField(const PGresult *res, int fieldcode) + void PQclear(PGresult *res) + + # 33.3.2. Retrieving Query Result Information + int PQntuples(const PGresult *res) + int PQnfields(const PGresult *res) + char *PQfname(const PGresult *res, int column_number) + int PQfnumber(const PGresult *res, const char *column_name) + Oid PQftable(const PGresult *res, int column_number) + int PQftablecol(const PGresult *res, int column_number) + int PQfformat(const PGresult *res, int column_number) + Oid PQftype(const PGresult *res, int column_number) + int PQfmod(const PGresult *res, int column_number) + int PQfsize(const PGresult *res, int column_number) + int PQbinaryTuples(const PGresult *res) + char *PQgetvalue(const PGresult *res, int row_number, int column_number) + int PQgetisnull(const PGresult *res, int row_number, int column_number) + int PQgetlength(const PGresult *res, int row_number, int column_number) + int PQnparams(const PGresult *res) + Oid PQparamtype(const PGresult *res, int param_number) + # PQprint: pretty useless + + # 33.3.3. Retrieving Other Result Information + char *PQcmdStatus(PGresult *res) + char *PQcmdTuples(PGresult *res) + Oid PQoidValue(const PGresult *res) + + # 33.3.4. Escaping Strings for Inclusion in SQL Commands + # TODO: PQescapeLiteral PQescapeIdentifier PQescapeStringConn PQescapeString + unsigned char *PQescapeByteaConn(PGconn *conn, + const unsigned char *src, + size_t from_length, + size_t *to_length) + unsigned char *PQescapeBytea(const unsigned char *src, + size_t from_length, + size_t *to_length) + unsigned char *PQunescapeBytea(const unsigned char *src, size_t *to_length) + + + # 33.4. Asynchronous Command Processing + int PQsendQuery(PGconn *conn, const char *command) + int PQsendQueryParams(PGconn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char * const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat) + int PQsendPrepare(PGconn *conn, + const char *stmtName, + const char *query, + int nParams, + const Oid *paramTypes) + int PQsendQueryPrepared(PGconn *conn, + const char *stmtName, + int nParams, + const char * const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat) + int PQsendDescribePrepared(PGconn *conn, const char *stmtName) + int PQsendDescribePortal(PGconn *conn, const char *portalName) + PGresult *PQgetResult(PGconn *conn) + int PQconsumeInput(PGconn *conn) + int PQisBusy(PGconn *conn) + int PQsetnonblocking(PGconn *conn, int arg) + int PQisnonblocking(const PGconn *conn) + int PQflush(PGconn *conn) + + # 33.11. Miscellaneous Functions + void PQfreemem(void *ptr) + void PQconninfoFree(PQconninfoOption *connOptions) + PGresult *PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) + int PQlibVersion() + diff --git a/psycopg3/pq/pq_cython.pyx b/psycopg3/pq/pq_cython.pyx new file mode 100644 index 000000000..68760cf7d --- /dev/null +++ b/psycopg3/pq/pq_cython.pyx @@ -0,0 +1,725 @@ +""" +libpq Python wrapper using cython bindings. +""" + +# Copyright (C) 2020 The Psycopg Team +from cpython.mem cimport PyMem_Malloc, PyMem_Free + +from psycopg3.pq cimport libpq as impl +from psycopg3.errors import OperationalError + +from .misc import error_message, ConninfoOption +from .enums import ( + ConnStatus, + PollingStatus, + ExecStatus, + TransactionStatus, + Ping, + DiagnosticField, + Format, +) + + +__impl__ = 'c' + + +class PQerror(OperationalError): + pass + + +def version(): + return impl.PQlibVersion() + +ctypedef unsigned int Oid; +ctypedef char *(*conn_bytes_f) (const impl.PGconn *) +ctypedef int(*conn_int_f) (const impl.PGconn *) + +cdef class PGconn: + cdef impl.PGconn* pgconn_ptr + + @staticmethod + cdef PGconn _from_ptr(impl.PGconn *ptr): + cdef PGconn rv = PGconn.__new__(PGconn) + rv.pgconn_ptr = ptr + return rv + + def __cinit__(self): + self.pgconn_ptr = NULL + + def __dealloc__(self): + self.finish() + + @classmethod + def connect(cls, conninfo: bytes) -> PGconn: + return _connect(conninfo) + + @classmethod + def connect_start(cls, conninfo: bytes) -> PGconn: + return _connect_start(conninfo) + + def connect_poll(self) -> PollingStatus: + cdef int rv = self._call_int(impl.PQconnectPoll) + return PollingStatus(rv) + + def finish(self) -> None: + if self.pgconn_ptr is not NULL: + impl.PQfinish(self.pgconn_ptr) + self.pgconn_ptr = NULL + + @property + def info(self) -> List["ConninfoOption"]: + self._ensure_pgconn() + cdef impl.PQconninfoOption *opts = impl.PQconninfo(self.pgconn_ptr) + if opts is NULL: + raise MemoryError("couldn't allocate connection info") + try: + return _options_from_array(opts) + finally: + impl.PQconninfoFree(opts) + + def reset(self) -> None: + self._ensure_pgconn() + impl.PQreset(self.pgconn_ptr) + + def reset_start(self) -> None: + if not impl.PQresetStart(self.pgconn_ptr): + raise PQerror("couldn't reset connection") + + def reset_poll(self) -> PollingStatus: + cdef int rv = self._call_int(impl.PQresetPoll) + return PollingStatus(rv) + + @classmethod + def ping(self, conninfo: bytes) -> Ping: + cdef int rv = impl.PQping(conninfo) + return Ping(rv) + + @property + def db(self) -> bytes: + return self._call_bytes(impl.PQdb) + + @property + def user(self) -> bytes: + return self._call_bytes(impl.PQuser) + + @property + def password(self) -> bytes: + return self._call_bytes(impl.PQpass) + + @property + def host(self) -> bytes: + return self._call_bytes(impl.PQhost) + + # @property + # def hostaddr(self) -> bytes: + # return self._call_bytes(impl.PQhostaddr) + + @property + def port(self) -> bytes: + return self._call_bytes(impl.PQport) + + @property + def tty(self) -> bytes: + return self._call_bytes(impl.PQtty) + + @property + def options(self) -> bytes: + return self._call_bytes(impl.PQoptions) + + @property + def status(self) -> ConnStatus: + cdef int rv = impl.PQstatus(self.pgconn_ptr) + return ConnStatus(rv) + + @property + def transaction_status(self) -> TransactionStatus: + cdef int rv = impl.PQtransactionStatus(self.pgconn_ptr) + return TransactionStatus(rv) + + def parameter_status(self, name: bytes) -> Optional[bytes]: + self._ensure_pgconn() + cdef const char *rv = impl.PQparameterStatus(self.pgconn_ptr, name) + if rv is not NULL: + return rv + else: + return None + + @property + def error_message(self) -> bytes: + return impl.PQerrorMessage(self.pgconn_ptr) + + @property + def protocol_version(self) -> int: + return self._call_int(impl.PQprotocolVersion) + + @property + def server_version(self) -> int: + return self._call_int(impl.PQserverVersion) + + @property + def socket(self) -> int: + return self._call_int(impl.PQsocket) + + @property + def backend_pid(self) -> int: + return self._call_int(impl.PQbackendPID) + + @property + def needs_password(self) -> bool: + return bool(self._call_int(impl.PQconnectionNeedsPassword)) + + @property + def used_password(self) -> bool: + return bool(self._call_int(impl.PQconnectionUsedPassword)) + + @property + def ssl_in_use(self) -> bool: + return bool(self._call_int(impl.PQsslInUse)) + + def exec_(self, command: bytes) -> PGresult: + self._ensure_pgconn() + cdef impl.PGresult *pgresult = impl.PQexec(self.pgconn_ptr, command) + if pgresult is NULL: + raise MemoryError("couldn't allocate PGresult") + + return PGresult._from_ptr(pgresult) + + def send_query(self, command: bytes) -> None: + self._ensure_pgconn() + if not impl.PQsendQuery(self.pgconn_ptr, command): + raise PQerror( + "sending query failed:" + f" {error_message(self)}" + ) + + def exec_params( + self, + command: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_types: Optional[Sequence[int]] = None, + param_formats: Optional[Sequence[Format]] = None, + result_format: Format = Format.TEXT, + ) -> PGresult: + self._ensure_pgconn() + + cdef int cnparams + cdef Oid *ctypes + cdef char *const *cvalues + cdef int *clenghts + cdef int *cformats + cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( + param_values, param_types, param_formats) + + cdef impl.PGresult *pgresult = impl.PQexecParams( + self.pgconn_ptr, command, cnparams, ctypes, + cvalues, clengths, cformats, result_format) + _clear_query_params(ctypes, cvalues, clengths, cformats) + if pgresult is NULL: + raise MemoryError("couldn't allocate PGresult") + return PGresult._from_ptr(pgresult) + + def send_query_params( + self, + command: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_types: Optional[Sequence[int]] = None, + param_formats: Optional[Sequence[Format]] = None, + result_format: Format = Format.TEXT, + ) -> None: + self._ensure_pgconn() + + cdef int cnparams + cdef Oid *ctypes + cdef char *const *cvalues + cdef int *clenghts + cdef int *cformats + cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( + param_values, param_types, param_formats) + + cdef int rv = impl.PQsendQueryParams( + self.pgconn_ptr, command, cnparams, ctypes, + cvalues, + clengths, cformats, result_format) + _clear_query_params(ctypes, cvalues, clengths, cformats) + if not rv: + raise PQerror( + "sending query and params failed:" + f" {error_message(self)}" + ) + + def send_prepare( + self, + name: bytes, + command: bytes, + param_types: Optional[Sequence[int]] = None, + ) -> None: + self._ensure_pgconn() + + cdef int i + cdef int nparams = len(param_types) if param_types else 0 + cdef Oid *atypes = NULL + if nparams: + atypes = PyMem_Malloc(nparams * sizeof(Oid)) + for i in range(nparams): + atypes[i] = param_types[i] + + cdef int rv = impl.PQsendPrepare( + self.pgconn_ptr, name, command, nparams, atypes + ) + PyMem_Free(atypes) + if not rv: + raise PQerror( + "sending query and params failed:" + f" {error_message(self)}" + ) + + def send_query_prepared( + self, + name: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_formats: Optional[Sequence[Format]] = None, + result_format: Format = Format.TEXT, + ) -> None: + self._ensure_pgconn() + + cdef int cnparams + cdef Oid *ctypes + cdef char *const *cvalues + cdef int *clenghts + cdef int *cformats + cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( + param_values, None, param_formats) + + cdef int rv = impl.PQsendQueryPrepared( + self.pgconn_ptr, name, cnparams, + cvalues, + clengths, cformats, result_format) + _clear_query_params(ctypes, cvalues, clengths, cformats) + if not rv: + raise PQerror( + "sending prepared query failed:" + f" {error_message(self)}" + ) + + def prepare( + self, + name: bytes, + command: bytes, + param_types: Optional[Sequence[int]] = None, + ) -> PGresult: + self._ensure_pgconn() + + cdef int i + cdef int nparams = len(param_types) if param_types else 0 + cdef Oid *atypes = NULL + if nparams: + atypes = PyMem_Malloc(nparams * sizeof(Oid)) + for i in range(nparams): + atypes[i] = param_types[i] + + cdef impl.PGresult *rv = impl.PQprepare( + self.pgconn_ptr, name, command, nparams, atypes) + PyMem_Free(atypes) + if rv is NULL: + raise MemoryError("couldn't allocate PGresult") + return PGresult._from_ptr(rv) + + def exec_prepared( + self, + name: bytes, + param_values: Optional[Sequence[bytes]], + param_formats: Optional[Sequence[int]] = None, + result_format: int = 0, + ) -> PGresult: + self._ensure_pgconn() + + cdef int cnparams + cdef Oid *ctypes + cdef char *const *cvalues + cdef int *clenghts + cdef int *cformats + cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( + param_values, None, param_formats) + + cdef impl.PGresult *rv = impl.PQexecPrepared( + self.pgconn_ptr, name, cnparams, + cvalues, + clengths, cformats, result_format) + + _clear_query_params(ctypes, cvalues, clengths, cformats) + if rv is NULL: + raise MemoryError("couldn't allocate PGresult") + return PGresult._from_ptr(rv) + + def describe_prepared(self, name: bytes) -> PGresult: + self._ensure_pgconn() + cdef impl.PGresult *rv = impl.PQdescribePrepared(self.pgconn_ptr, name) + if rv is NULL: + raise MemoryError("couldn't allocate PGresult") + return PGresult._from_ptr(rv) + + def describe_portal(self, name: bytes) -> PGresult: + self._ensure_pgconn() + cdef impl.PGresult *rv = impl.PQdescribePortal(self.pgconn_ptr, name) + if rv is NULL: + raise MemoryError("couldn't allocate PGresult") + return PGresult._from_ptr(rv) + + def get_result(self) -> Optional["PGresult"]: + cdef impl.PGresult *pgresult = impl.PQgetResult(self.pgconn_ptr) + if pgresult is NULL: + return None + return PGresult._from_ptr(pgresult) + + def consume_input(self) -> None: + if 1 != impl.PQconsumeInput(self.pgconn_ptr): + raise PQerror( + "consuming input failed:" + f" {error_message(self)}" + ) + + def is_busy(self) -> int: + return impl.PQisBusy(self.pgconn_ptr) + + @property + def nonblocking(self) -> int: + return impl.PQisnonblocking(self.pgconn_ptr) + + @nonblocking.setter + def nonblocking(self, arg: int) -> None: + if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg): + raise PQerror( + f"setting nonblocking failed:" + f" {error_message(self)}" + ) + + def flush(self) -> int: + cdef int rv = impl.PQflush(self.pgconn_ptr) + if rv < 0: + raise PQerror( + f"flushing failed:{error_message(self)}" + ) + return rv + + def make_empty_result(self, exec_status: ExecStatus) -> PGresult: + cdef impl.PGresult *rv = impl.PQmakeEmptyPGresult( + self.pgconn_ptr, exec_status) + if not rv: + raise MemoryError("couldn't allocate empty PGresult") + return PGresult._from_ptr(rv) + + cdef int _ensure_pgconn(self) except 0: + if self.pgconn_ptr is not NULL: + return 1 + + raise PQerror("the connection is closed") + + cdef char *_call_bytes(self, conn_bytes_f func) except NULL: + """ + Call one of the pgconn libpq functions returning a bytes pointer. + """ + if not self._ensure_pgconn(): + return NULL + cdef char *rv = func(self.pgconn_ptr) + assert rv is not NULL + return rv + + cdef int _call_int(self, conn_int_f func) except -1: + """ + Call one of the pgconn libpq functions returning an int. + """ + if not self._ensure_pgconn(): + return -1 + return func(self.pgconn_ptr) + + +cdef PGconn _connect(const char *conninfo): + cdef impl.PGconn* pgconn = impl.PQconnectdb(conninfo) + if not pgconn: + raise MemoryError("couldn't allocate PGconn") + + return PGconn._from_ptr(pgconn) + + +cdef PGconn _connect_start(const char *conninfo): + cdef impl.PGconn* pgconn = impl.PQconnectStart(conninfo) + if not pgconn: + raise MemoryError("couldn't allocate PGconn") + + return PGconn._from_ptr(pgconn) + + +cdef (int, Oid *, char * const*, int *, int *) _query_params_args( + param_values: Optional[Sequence[Optional[bytes]]], + param_types: Optional[Sequence[int]], + param_formats: Optional[Sequence[Format]], +) except *: + cdef int i + + cdef int nparams = len(param_values) if param_values else 0 + if param_types is not None and len(param_types) != nparams: + raise ValueError( + "got %d param_values but %d param_types" + % (nparams, len(param_types)) + ) + if param_formats is not None and len(param_formats) != nparams: + raise ValueError( + "got %d param_values but %d param_formats" + % (nparams, len(param_formats)) + ) + + cdef char **aparams = NULL + cdef int *alenghts = NULL + if nparams: + aparams = PyMem_Malloc(nparams * sizeof(char *)) + alenghts = PyMem_Malloc(nparams * sizeof(int)) + for i in range(nparams): + if param_values[i] is not None: + aparams[i] = param_values[i] + alenghts[i] = len(param_values[i]) + else: + aparams[i] = NULL + alenghts[i] = 0 + + cdef Oid *atypes = NULL + if param_types is not None: + atypes = PyMem_Malloc(nparams * sizeof(Oid)) + for i in range(nparams): + atypes[i] = param_types[i] + + cdef int *aformats = NULL + if param_formats is not None: + aformats = PyMem_Malloc(nparams * sizeof(int *)) + for i in range(nparams): + aformats[i] = param_formats[i] + + return (nparams, atypes, aparams, alenghts, aformats) + + +cdef void _clear_query_params( + Oid *ctypes, char *const *cvalues, int *clenghts, int *cformats +): + PyMem_Free(ctypes) + PyMem_Free(cvalues) + PyMem_Free(clenghts) + PyMem_Free(cformats) + + +cdef _options_from_array(impl.PQconninfoOption *opts): + rv = [] + cdef int i = 0 + cdef impl.PQconninfoOption* opt + while 1: + opt = opts + i + if opt.keyword is NULL: + break + rv.append( + ConninfoOption( + keyword=opt.keyword, + envvar=opt.envvar if opt.envvar is not NULL else None, + compiled=opt.compiled if opt.compiled is not NULL else None, + val=opt.val if opt.val is not NULL else None, + label=opt.label if opt.label is not NULL else None, + dispchar=opt.dispchar if opt.dispchar is not NULL else None, + dispsize=opt.dispsize, + ) + ) + i += 1 + + return rv + + +cdef class PGresult: + cdef impl.PGresult* pgresult_ptr + + def __cinit__(self): + self.pgresult_ptr = NULL + + @staticmethod + cdef PGresult _from_ptr(impl.PGresult *ptr): + cdef PGresult rv = PGresult.__new__(PGresult) + rv.pgresult_ptr = ptr + return rv + + def __dealloc__(self) -> None: + self.clear() + + def clear(self) -> None: + if self.pgresult_ptr is not NULL: + impl.PQclear(self.pgresult_ptr) + self.pgresult_ptr = NULL + + @property + def status(self) -> ExecStatus: + cdef int rv = impl.PQresultStatus(self.pgresult_ptr) + return ExecStatus(rv) + + @property + def error_message(self) -> bytes: + return impl.PQresultErrorMessage(self.pgresult_ptr) + + def error_field(self, fieldcode: DiagnosticField) -> Optional[bytes]: + cdef char * rv = impl.PQresultErrorField(self.pgresult_ptr, fieldcode) + if rv is not NULL: + return rv + else: + return None + + @property + def ntuples(self) -> int: + return impl.PQntuples(self.pgresult_ptr) + + @property + def nfields(self) -> int: + return impl.PQnfields(self.pgresult_ptr) + + def fname(self, column_number: int) -> Optional[bytes]: + cdef char *rv = impl.PQfname(self.pgresult_ptr, column_number) + if rv is not NULL: + return rv + else: + return None + + def ftable(self, column_number: int) -> int: + return impl.PQftable(self.pgresult_ptr, column_number) + + def ftablecol(self, column_number: int) -> int: + return impl.PQftablecol(self.pgresult_ptr, column_number) + + def fformat(self, column_number: int) -> Format: + return Format(impl.PQfformat(self.pgresult_ptr, column_number)) + + def ftype(self, column_number: int) -> int: + return impl.PQftype(self.pgresult_ptr, column_number) + + def fmod(self, column_number: int) -> int: + return impl.PQfmod(self.pgresult_ptr, column_number) + + def fsize(self, column_number: int) -> int: + return impl.PQfsize(self.pgresult_ptr, column_number) + + @property + def binary_tuples(self) -> Format: + return Format(impl.PQbinaryTuples(self.pgresult_ptr)) + + def get_value( + self, row_number: int, column_number: int + ) -> Optional[bytes]: + cdef int length = impl.PQgetlength( + self.pgresult_ptr, row_number, column_number + ) + cdef char *v; + if length: + v = impl.PQgetvalue(self.pgresult_ptr, row_number, column_number) + # TODO: avoid copy + return v[:length] + else: + if impl.PQgetisnull(self.pgresult_ptr, row_number, column_number): + return None + else: + return b"" + + @property + def nparams(self) -> int: + return impl.PQnparams(self.pgresult_ptr) + + def param_type(self, param_number: int) -> int: + return impl.PQparamtype(self.pgresult_ptr, param_number) + + @property + def command_status(self) -> Optional[bytes]: + cdef char *rv = impl.PQcmdStatus(self.pgresult_ptr) + if rv is not NULL: + return rv + else: + return None + + @property + def command_tuples(self) -> Optional[int]: + cdef char *rv = impl.PQcmdTuples(self.pgresult_ptr) + if rv is NULL: + return None + cdef bytes brv = rv + return int(brv) if brv else None + + @property + def oid_value(self) -> int: + return impl.PQoidValue(self.pgresult_ptr) + + +class Conninfo: + @classmethod + def get_defaults(cls) -> List[ConninfoOption]: + cdef impl.PQconninfoOption *opts = impl.PQconndefaults() + if opts is NULL : + raise MemoryError("couldn't allocate connection defaults") + try: + return _options_from_array(opts) + finally: + impl.PQconninfoFree(opts) + + @classmethod + def parse(cls, conninfo: bytes) -> List[ConninfoOption]: + cdef char *errmsg = NULL + cdef impl.PQconninfoOption *rv = impl.PQconninfoParse(conninfo, &errmsg) + if rv is NULL: + if errmsg is NULL: + raise MemoryError("couldn't allocate on conninfo parse") + else: + exc = PQerror(errmsg.decode("utf8", "replace")) + impl.PQfreemem(errmsg) + raise exc + + try: + return _options_from_array(rv) + finally: + impl.PQconninfoFree(rv) + + def __repr__(self): + return f"<{type(self).__name__} ({self.keyword.decode('ascii')})>" + + +cdef class Escaping: + cdef PGconn conn + + def __init__(self, conn: Optional[PGconn] = None): + self.conn = conn + + def escape_bytea(self, data: bytes) -> bytes: + cdef size_t len_out + cdef unsigned char *out + if self.conn is not None: + if self.conn.pgconn_ptr is NULL: + raise PQerror("the connection is closed") + out = impl.PQescapeByteaConn( + self.conn.pgconn_ptr, data, len(data), &len_out) + else: + out = impl.PQescapeBytea(data, len(data), &len_out) + if out is NULL: + raise MemoryError( + f"couldn't allocate for escape_bytea of {len(data)} bytes" + ) + + # TODO: without copy? + rv = out[:len_out - 1] # out includes final 0 + impl.PQfreemem(out) + return rv + + def unescape_bytea(self, data: bytes) -> bytes: + # not needed, but let's keep it symmetric with the escaping: + # if a connection is passed in, it must be valid. + if self.conn is not None: + if self.conn.pgconn_ptr is NULL: + raise PQerror("the connection is closed") + + cdef size_t len_out + cdef unsigned char *out = impl.PQunescapeBytea(data, &len_out) + if out is NULL: + raise MemoryError( + f"couldn't allocate for unescape_bytea of {len(data)} bytes" + ) + + rv = out[:len_out] + impl.PQfreemem(out) + return rv + diff --git a/setup.py b/setup.py index 0a001d93c..85ec9a6ae 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,15 @@ psycopg3 -- PostgreSQL database adapter for Python import re import os -from setuptools import setup, find_packages # type: ignore +import subprocess as sp +from setuptools import setup, find_packages, Extension # type: ignore +from distutils.command.build_ext import build_ext # type: ignore +from distutils import log + +try: + from Cython.Build import cythonize # type: ignore +except ImportError: + cythonize = None # Grab the version without importing the module # or we will get import errors on install if prerequisites are still missing @@ -35,6 +43,47 @@ Topic :: Software Development Topic :: Software Development :: Libraries :: Python Modules """ + +class our_build_ext(build_ext): # type: ignore + def finalize_options(self) -> None: + self._setup_ext_build() + super().finalize_options() + + def run(self) -> None: + super().run() + + def _setup_ext_build(self) -> None: + try: + from Cython.Build import cythonize + except ImportError: + log.warn( + "Cython is not available: the C module will not be built", () + ) + return + + try: + out = sp.run( + ["pg_config", f"--includedir"], stdout=sp.PIPE, check=True + ) + except Exception as e: + log.warn("cannot build C module: %s", (e,)) + return + + includedir = out.stdout.strip().decode("utf8") + + ext = Extension( + "psycopg3.pq.pq_cython", + ["psycopg3/pq/pq_cython.pyx"], + libraries=["pq"], + include_dirs=[includedir], + ) + self.distribution.ext_modules = cythonize( + [ext], + language_level=3, + # annotate=True, # enable to get an html view of the C module + ) + + setup( name="psycopg3", description=readme.splitlines()[0], @@ -45,6 +94,8 @@ setup( python_requires=">=3.6", packages=find_packages(exclude=["tests"]), classifiers=[x for x in classifiers.split("\n") if x], + setup_requires=["Cython"], + install_requires=["Cython"], zip_safe=False, version=version, project_urls={ @@ -53,4 +104,5 @@ setup( "Issue Tracker": "https://github.com/psycopg/psycopg3/issues", "Download": "https://pypi.org/project/psycopg3/", }, + cmdclass={"build_ext": our_build_ext}, ) diff --git a/tests/pq/test_pgconn.py b/tests/pq/test_pgconn.py index c3cb4c89b..3e8593dea 100644 --- a/tests/pq/test_pgconn.py +++ b/tests/pq/test_pgconn.py @@ -166,6 +166,8 @@ def test_host(pgconn): pgconn.host +# TODO: to implement in psycopg3.pq.pq_cython +@pytest.mark.xfail @pytest.mark.libpq(">= 12") def test_hostaddr(pgconn): # not in info