From: Daniele Varrazzo Date: Sun, 20 Dec 2020 23:58:11 +0000 (+0100) Subject: PGconn code moved to pq directory X-Git-Tag: 3.0.dev0~252^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cb57c609ac3d92ff35fb0bf718d6209eb30e7aec;p=thirdparty%2Fpsycopg.git PGconn code moved to pq directory --- diff --git a/psycopg3_c/psycopg3_c/pq/pgconn.pyx b/psycopg3_c/psycopg3_c/pq/pgconn.pyx new file mode 100644 index 000000000..b737f0e30 --- /dev/null +++ b/psycopg3_c/psycopg3_c/pq/pgconn.pyx @@ -0,0 +1,469 @@ +""" +psycopg3_c.pq_cython.PGconn object implementation. +""" + +# Copyright (C) 2020 The Psycopg Team + + +cdef class PGconn: + @staticmethod + cdef PGconn _from_ptr(impl.PGconn *ptr): + cdef PGconn rv = PGconn.__new__(PGconn) + rv.pgconn_ptr = ptr + + impl.PQsetNoticeReceiver(ptr, notice_receiver, rv) + return rv + + def __cinit__(self): + self.pgconn_ptr = NULL + self._procpid = getpid() + + def __dealloc__(self): + # Close the connection only if it was created in this process, + # not if this object is being GC'd after fork. + if self._procpid == getpid(): + self.finish() + + @classmethod + def connect(cls, conninfo: bytes) -> PGconn: + return _connect(conninfo) + + @classmethod + def connect_start(cls, conninfo: bytes) -> PGconn: + return _connect_start(conninfo) + + def connect_poll(self) -> PollingStatus: + cdef int rv = self._call_int(impl.PQconnectPoll) + return PollingStatus(rv) + + def finish(self) -> None: + if self.pgconn_ptr is not NULL: + impl.PQfinish(self.pgconn_ptr) + self.pgconn_ptr = NULL + + @property + def pgconn_ptr(self) -> Optional[int]: + if self.pgconn_ptr: + return self.pgconn_ptr + else: + return None + + @property + def info(self) -> List["ConninfoOption"]: + self._ensure_pgconn() + cdef impl.PQconninfoOption *opts = impl.PQconninfo(self.pgconn_ptr) + if opts is NULL: + raise MemoryError("couldn't allocate connection info") + rv = _options_from_array(opts) + impl.PQconninfoFree(opts) + return rv + + def reset(self) -> None: + self._ensure_pgconn() + impl.PQreset(self.pgconn_ptr) + + def reset_start(self) -> None: + if not impl.PQresetStart(self.pgconn_ptr): + raise PQerror("couldn't reset connection") + + def reset_poll(self) -> PollingStatus: + cdef int rv = self._call_int(impl.PQresetPoll) + return PollingStatus(rv) + + @classmethod + def ping(self, conninfo: bytes) -> Ping: + cdef int rv = impl.PQping(conninfo) + return Ping(rv) + + @property + def db(self) -> bytes: + return self._call_bytes(impl.PQdb) + + @property + def user(self) -> bytes: + return self._call_bytes(impl.PQuser) + + @property + def password(self) -> bytes: + return self._call_bytes(impl.PQpass) + + @property + def host(self) -> bytes: + return self._call_bytes(impl.PQhost) + + @property + def hostaddr(self) -> bytes: + return b'TODO' + + @property + def port(self) -> bytes: + return self._call_bytes(impl.PQport) + + @property + def tty(self) -> bytes: + return self._call_bytes(impl.PQtty) + + @property + def options(self) -> bytes: + return self._call_bytes(impl.PQoptions) + + @property + def status(self) -> ConnStatus: + cdef int rv = impl.PQstatus(self.pgconn_ptr) + return ConnStatus(rv) + + @property + def transaction_status(self) -> TransactionStatus: + cdef int rv = impl.PQtransactionStatus(self.pgconn_ptr) + return TransactionStatus(rv) + + def parameter_status(self, name: bytes) -> Optional[bytes]: + self._ensure_pgconn() + cdef const char *rv = impl.PQparameterStatus(self.pgconn_ptr, name) + if rv is not NULL: + return rv + else: + return None + + @property + def error_message(self) -> bytes: + return impl.PQerrorMessage(self.pgconn_ptr) + + @property + def protocol_version(self) -> int: + return self._call_int(impl.PQprotocolVersion) + + @property + def server_version(self) -> int: + return self._call_int(impl.PQserverVersion) + + @property + def socket(self) -> int: + return self._call_int(impl.PQsocket) + + @property + def backend_pid(self) -> int: + return self._call_int(impl.PQbackendPID) + + @property + def needs_password(self) -> bool: + return bool(self._call_int(impl.PQconnectionNeedsPassword)) + + @property + def used_password(self) -> bool: + return bool(self._call_int(impl.PQconnectionUsedPassword)) + + @property + def ssl_in_use(self) -> bool: + return bool(self._call_int(impl.PQsslInUse)) + + def exec_(self, command: bytes) -> PGresult: + self._ensure_pgconn() + cdef impl.PGresult *pgresult = impl.PQexec(self.pgconn_ptr, command) + if pgresult is NULL: + raise MemoryError("couldn't allocate PGresult") + + return PGresult._from_ptr(pgresult) + + def send_query(self, command: bytes) -> None: + self._ensure_pgconn() + if not impl.PQsendQuery(self.pgconn_ptr, command): + raise PQerror(f"sending query failed: {error_message(self)}") + + def exec_params( + self, + command: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_types: Optional[Sequence[int]] = None, + param_formats: Optional[Sequence[Format]] = None, + result_format: Format = Format.TEXT, + ) -> PGresult: + self._ensure_pgconn() + + cdef int cnparams + cdef Oid *ctypes + cdef char *const *cvalues + cdef int *clenghts + cdef int *cformats + cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( + param_values, param_types, param_formats) + + cdef impl.PGresult *pgresult = impl.PQexecParams( + self.pgconn_ptr, command, cnparams, ctypes, + cvalues, clengths, cformats, result_format) + _clear_query_params(ctypes, cvalues, clengths, cformats) + if pgresult is NULL: + raise MemoryError("couldn't allocate PGresult") + return PGresult._from_ptr(pgresult) + + def send_query_params( + self, + command: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_types: Optional[Sequence[int]] = None, + param_formats: Optional[Sequence[Format]] = None, + result_format: Format = Format.TEXT, + ) -> None: + self._ensure_pgconn() + + cdef int cnparams + cdef Oid *ctypes + cdef char *const *cvalues + cdef int *clenghts + cdef int *cformats + cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( + param_values, param_types, param_formats) + + cdef int rv = impl.PQsendQueryParams( + self.pgconn_ptr, command, cnparams, ctypes, + cvalues, + clengths, cformats, result_format) + _clear_query_params(ctypes, cvalues, clengths, cformats) + if not rv: + raise PQerror( + f"sending query and params failed: {error_message(self)}" + ) + + def send_prepare( + self, + name: bytes, + command: bytes, + param_types: Optional[Sequence[int]] = None, + ) -> None: + self._ensure_pgconn() + + cdef int i + cdef int nparams = len(param_types) if param_types else 0 + cdef Oid *atypes = NULL + if nparams: + atypes = PyMem_Malloc(nparams * sizeof(Oid)) + for i in range(nparams): + atypes[i] = param_types[i] + + cdef int rv = impl.PQsendPrepare( + self.pgconn_ptr, name, command, nparams, atypes + ) + PyMem_Free(atypes) + if not rv: + raise PQerror( + f"sending query and params failed: {error_message(self)}" + ) + + def send_query_prepared( + self, + name: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_formats: Optional[Sequence[Format]] = None, + result_format: Format = Format.TEXT, + ) -> None: + self._ensure_pgconn() + + cdef int cnparams + cdef Oid *ctypes + cdef char *const *cvalues + cdef int *clenghts + cdef int *cformats + cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( + param_values, None, param_formats) + + cdef int rv = impl.PQsendQueryPrepared( + self.pgconn_ptr, name, cnparams, + cvalues, + clengths, cformats, result_format) + _clear_query_params(ctypes, cvalues, clengths, cformats) + if not rv: + raise PQerror( + f"sending prepared query failed: {error_message(self)}" + ) + + def prepare( + self, + name: bytes, + command: bytes, + param_types: Optional[Sequence[int]] = None, + ) -> PGresult: + self._ensure_pgconn() + + cdef int i + cdef int nparams = len(param_types) if param_types else 0 + cdef Oid *atypes = NULL + if nparams: + atypes = PyMem_Malloc(nparams * sizeof(Oid)) + for i in range(nparams): + atypes[i] = param_types[i] + + cdef impl.PGresult *rv = impl.PQprepare( + self.pgconn_ptr, name, command, nparams, atypes) + PyMem_Free(atypes) + if rv is NULL: + raise MemoryError("couldn't allocate PGresult") + return PGresult._from_ptr(rv) + + def exec_prepared( + self, + name: bytes, + param_values: Optional[Sequence[bytes]], + param_formats: Optional[Sequence[int]] = None, + result_format: int = 0, + ) -> PGresult: + self._ensure_pgconn() + + cdef int cnparams + cdef Oid *ctypes + cdef char *const *cvalues + cdef int *clenghts + cdef int *cformats + cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( + param_values, None, param_formats) + + cdef impl.PGresult *rv = impl.PQexecPrepared( + self.pgconn_ptr, name, cnparams, + cvalues, + clengths, cformats, result_format) + + _clear_query_params(ctypes, cvalues, clengths, cformats) + if rv is NULL: + raise MemoryError("couldn't allocate PGresult") + return PGresult._from_ptr(rv) + + def describe_prepared(self, name: bytes) -> PGresult: + self._ensure_pgconn() + cdef impl.PGresult *rv = impl.PQdescribePrepared(self.pgconn_ptr, name) + if rv is NULL: + raise MemoryError("couldn't allocate PGresult") + return PGresult._from_ptr(rv) + + def describe_portal(self, name: bytes) -> PGresult: + self._ensure_pgconn() + cdef impl.PGresult *rv = impl.PQdescribePortal(self.pgconn_ptr, name) + if rv is NULL: + raise MemoryError("couldn't allocate PGresult") + return PGresult._from_ptr(rv) + + def get_result(self) -> Optional["PGresult"]: + cdef impl.PGresult *pgresult = impl.PQgetResult(self.pgconn_ptr) + if pgresult is NULL: + return None + return PGresult._from_ptr(pgresult) + + def consume_input(self) -> None: + if 1 != impl.PQconsumeInput(self.pgconn_ptr): + raise PQerror(f"consuming input failed: {error_message(self)}") + + def is_busy(self) -> int: + cdef int rv + with nogil: + rv = impl.PQisBusy(self.pgconn_ptr) + return rv + + @property + def nonblocking(self) -> int: + return impl.PQisnonblocking(self.pgconn_ptr) + + @nonblocking.setter + def nonblocking(self, arg: int) -> None: + if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg): + raise PQerror(f"setting nonblocking failed: {error_message(self)}") + + def flush(self) -> int: + cdef int rv = impl.PQflush(self.pgconn_ptr) + if rv < 0: + raise PQerror(f"flushing failed:{error_message(self)}") + return rv + + def get_cancel(self) -> PGcancel: + cdef impl.PGcancel *ptr = impl.PQgetCancel(self.pgconn_ptr) + if not ptr: + raise PQerror("couldn't create cancel object") + return PGcancel._from_ptr(ptr) + + def notifies(self) -> Optional[PGnotify]: + cdef impl.PGnotify *ptr + with nogil: + ptr = impl.PQnotifies(self.pgconn_ptr) + if ptr: + ret = PGnotify(ptr.relname, ptr.be_pid, ptr.extra) + impl.PQfreemem(ptr) + return ret + else: + return None + + def put_copy_data(self, buffer: bytes) -> int: + cdef int rv + cdef const char *cbuffer = PyBytes_AsString(buffer) + cdef int length = len(buffer) + rv = impl.PQputCopyData(self.pgconn_ptr, cbuffer, length) + if rv < 0: + raise PQerror(f"sending copy data failed: {error_message(self)}") + return rv + + def put_copy_end(self, error: Optional[bytes] = None) -> int: + cdef int rv + cdef const char *cerr = NULL + if error is not None: + cerr = PyBytes_AsString(error) + rv = impl.PQputCopyEnd(self.pgconn_ptr, cerr) + if rv < 0: + raise PQerror(f"sending copy end failed: {error_message(self)}") + return rv + + def get_copy_data(self, async_: int) -> Tuple[int, bytes]: + cdef char *buffer_ptr = NULL + cdef int nbytes + nbytes = impl.PQgetCopyData(self.pgconn_ptr, &buffer_ptr, async_) + if nbytes == -2: + raise PQerror(f"receiving copy data failed: {error_message(self)}") + if buffer_ptr is not NULL: + # TODO: do it without copy + data = buffer_ptr[:nbytes] + impl.PQfreemem(buffer_ptr) + return nbytes, data + else: + return nbytes, b"" + + def make_empty_result(self, exec_status: ExecStatus) -> PGresult: + cdef impl.PGresult *rv = impl.PQmakeEmptyPGresult( + self.pgconn_ptr, exec_status) + if not rv: + raise MemoryError("couldn't allocate empty PGresult") + return PGresult._from_ptr(rv) + + cdef int _ensure_pgconn(self) except 0: + if self.pgconn_ptr is not NULL: + return 1 + + raise PQerror("the connection is closed") + + cdef char *_call_bytes(self, conn_bytes_f func) except NULL: + """ + Call one of the pgconn libpq functions returning a bytes pointer. + """ + if not self._ensure_pgconn(): + return NULL + cdef char *rv = func(self.pgconn_ptr) + assert rv is not NULL + return rv + + cdef int _call_int(self, conn_int_f func) except -1: + """ + Call one of the pgconn libpq functions returning an int. + """ + if not self._ensure_pgconn(): + return -1 + return func(self.pgconn_ptr) + + +cdef PGconn _connect(const char *conninfo): + cdef impl.PGconn* pgconn = impl.PQconnectdb(conninfo) + if not pgconn: + raise MemoryError("couldn't allocate PGconn") + + return PGconn._from_ptr(pgconn) + + +cdef PGconn _connect_start(const char *conninfo): + cdef impl.PGconn* pgconn = impl.PQconnectStart(conninfo) + if not pgconn: + raise MemoryError("couldn't allocate PGconn") + + return PGconn._from_ptr(pgconn) diff --git a/psycopg3_c/psycopg3_c/pq_cython.pyx b/psycopg3_c/psycopg3_c/pq_cython.pyx index 444be18a4..26abe2c29 100644 --- a/psycopg3_c/psycopg3_c/pq_cython.pyx +++ b/psycopg3_c/psycopg3_c/pq_cython.pyx @@ -55,469 +55,7 @@ cdef void notice_receiver(void *arg, const impl.PGresult *res_ptr) with gil: res.pgresult_ptr = NULL # avoid destroying the pgresult_ptr -cdef class PGconn: - @staticmethod - cdef PGconn _from_ptr(impl.PGconn *ptr): - cdef PGconn rv = PGconn.__new__(PGconn) - rv.pgconn_ptr = ptr - - impl.PQsetNoticeReceiver(ptr, notice_receiver, rv) - return rv - - def __cinit__(self): - self.pgconn_ptr = NULL - self._procpid = getpid() - - def __dealloc__(self): - # Close the connection only if it was created in this process, - # not if this object is being GC'd after fork. - if self._procpid == getpid(): - self.finish() - - @classmethod - def connect(cls, conninfo: bytes) -> PGconn: - return _connect(conninfo) - - @classmethod - def connect_start(cls, conninfo: bytes) -> PGconn: - return _connect_start(conninfo) - - def connect_poll(self) -> PollingStatus: - cdef int rv = self._call_int(impl.PQconnectPoll) - return PollingStatus(rv) - - def finish(self) -> None: - if self.pgconn_ptr is not NULL: - impl.PQfinish(self.pgconn_ptr) - self.pgconn_ptr = NULL - - @property - def pgconn_ptr(self) -> Optional[int]: - if self.pgconn_ptr: - return self.pgconn_ptr - else: - return None - - @property - def info(self) -> List["ConninfoOption"]: - self._ensure_pgconn() - cdef impl.PQconninfoOption *opts = impl.PQconninfo(self.pgconn_ptr) - if opts is NULL: - raise MemoryError("couldn't allocate connection info") - rv = _options_from_array(opts) - impl.PQconninfoFree(opts) - return rv - - def reset(self) -> None: - self._ensure_pgconn() - impl.PQreset(self.pgconn_ptr) - - def reset_start(self) -> None: - if not impl.PQresetStart(self.pgconn_ptr): - raise PQerror("couldn't reset connection") - - def reset_poll(self) -> PollingStatus: - cdef int rv = self._call_int(impl.PQresetPoll) - return PollingStatus(rv) - - @classmethod - def ping(self, conninfo: bytes) -> Ping: - cdef int rv = impl.PQping(conninfo) - return Ping(rv) - - @property - def db(self) -> bytes: - return self._call_bytes(impl.PQdb) - - @property - def user(self) -> bytes: - return self._call_bytes(impl.PQuser) - - @property - def password(self) -> bytes: - return self._call_bytes(impl.PQpass) - - @property - def host(self) -> bytes: - return self._call_bytes(impl.PQhost) - - @property - def hostaddr(self) -> bytes: - return b'TODO' - - @property - def port(self) -> bytes: - return self._call_bytes(impl.PQport) - - @property - def tty(self) -> bytes: - return self._call_bytes(impl.PQtty) - - @property - def options(self) -> bytes: - return self._call_bytes(impl.PQoptions) - - @property - def status(self) -> ConnStatus: - cdef int rv = impl.PQstatus(self.pgconn_ptr) - return ConnStatus(rv) - - @property - def transaction_status(self) -> TransactionStatus: - cdef int rv = impl.PQtransactionStatus(self.pgconn_ptr) - return TransactionStatus(rv) - - def parameter_status(self, name: bytes) -> Optional[bytes]: - self._ensure_pgconn() - cdef const char *rv = impl.PQparameterStatus(self.pgconn_ptr, name) - if rv is not NULL: - return rv - else: - return None - - @property - def error_message(self) -> bytes: - return impl.PQerrorMessage(self.pgconn_ptr) - - @property - def protocol_version(self) -> int: - return self._call_int(impl.PQprotocolVersion) - - @property - def server_version(self) -> int: - return self._call_int(impl.PQserverVersion) - - @property - def socket(self) -> int: - return self._call_int(impl.PQsocket) - - @property - def backend_pid(self) -> int: - return self._call_int(impl.PQbackendPID) - - @property - def needs_password(self) -> bool: - return bool(self._call_int(impl.PQconnectionNeedsPassword)) - - @property - def used_password(self) -> bool: - return bool(self._call_int(impl.PQconnectionUsedPassword)) - - @property - def ssl_in_use(self) -> bool: - return bool(self._call_int(impl.PQsslInUse)) - - def exec_(self, command: bytes) -> PGresult: - self._ensure_pgconn() - cdef impl.PGresult *pgresult = impl.PQexec(self.pgconn_ptr, command) - if pgresult is NULL: - raise MemoryError("couldn't allocate PGresult") - - return PGresult._from_ptr(pgresult) - - def send_query(self, command: bytes) -> None: - self._ensure_pgconn() - if not impl.PQsendQuery(self.pgconn_ptr, command): - raise PQerror(f"sending query failed: {error_message(self)}") - - def exec_params( - self, - command: bytes, - param_values: Optional[Sequence[Optional[bytes]]], - param_types: Optional[Sequence[int]] = None, - param_formats: Optional[Sequence[Format]] = None, - result_format: Format = Format.TEXT, - ) -> PGresult: - self._ensure_pgconn() - - cdef int cnparams - cdef Oid *ctypes - cdef char *const *cvalues - cdef int *clenghts - cdef int *cformats - cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( - param_values, param_types, param_formats) - - cdef impl.PGresult *pgresult = impl.PQexecParams( - self.pgconn_ptr, command, cnparams, ctypes, - cvalues, clengths, cformats, result_format) - _clear_query_params(ctypes, cvalues, clengths, cformats) - if pgresult is NULL: - raise MemoryError("couldn't allocate PGresult") - return PGresult._from_ptr(pgresult) - - def send_query_params( - self, - command: bytes, - param_values: Optional[Sequence[Optional[bytes]]], - param_types: Optional[Sequence[int]] = None, - param_formats: Optional[Sequence[Format]] = None, - result_format: Format = Format.TEXT, - ) -> None: - self._ensure_pgconn() - - cdef int cnparams - cdef Oid *ctypes - cdef char *const *cvalues - cdef int *clenghts - cdef int *cformats - cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( - param_values, param_types, param_formats) - - cdef int rv = impl.PQsendQueryParams( - self.pgconn_ptr, command, cnparams, ctypes, - cvalues, - clengths, cformats, result_format) - _clear_query_params(ctypes, cvalues, clengths, cformats) - if not rv: - raise PQerror( - f"sending query and params failed: {error_message(self)}" - ) - - def send_prepare( - self, - name: bytes, - command: bytes, - param_types: Optional[Sequence[int]] = None, - ) -> None: - self._ensure_pgconn() - - cdef int i - cdef int nparams = len(param_types) if param_types else 0 - cdef Oid *atypes = NULL - if nparams: - atypes = PyMem_Malloc(nparams * sizeof(Oid)) - for i in range(nparams): - atypes[i] = param_types[i] - - cdef int rv = impl.PQsendPrepare( - self.pgconn_ptr, name, command, nparams, atypes - ) - PyMem_Free(atypes) - if not rv: - raise PQerror( - f"sending query and params failed: {error_message(self)}" - ) - - def send_query_prepared( - self, - name: bytes, - param_values: Optional[Sequence[Optional[bytes]]], - param_formats: Optional[Sequence[Format]] = None, - result_format: Format = Format.TEXT, - ) -> None: - self._ensure_pgconn() - - cdef int cnparams - cdef Oid *ctypes - cdef char *const *cvalues - cdef int *clenghts - cdef int *cformats - cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( - param_values, None, param_formats) - - cdef int rv = impl.PQsendQueryPrepared( - self.pgconn_ptr, name, cnparams, - cvalues, - clengths, cformats, result_format) - _clear_query_params(ctypes, cvalues, clengths, cformats) - if not rv: - raise PQerror( - f"sending prepared query failed: {error_message(self)}" - ) - - def prepare( - self, - name: bytes, - command: bytes, - param_types: Optional[Sequence[int]] = None, - ) -> PGresult: - self._ensure_pgconn() - - cdef int i - cdef int nparams = len(param_types) if param_types else 0 - cdef Oid *atypes = NULL - if nparams: - atypes = PyMem_Malloc(nparams * sizeof(Oid)) - for i in range(nparams): - atypes[i] = param_types[i] - - cdef impl.PGresult *rv = impl.PQprepare( - self.pgconn_ptr, name, command, nparams, atypes) - PyMem_Free(atypes) - if rv is NULL: - raise MemoryError("couldn't allocate PGresult") - return PGresult._from_ptr(rv) - - def exec_prepared( - self, - name: bytes, - param_values: Optional[Sequence[bytes]], - param_formats: Optional[Sequence[int]] = None, - result_format: int = 0, - ) -> PGresult: - self._ensure_pgconn() - - cdef int cnparams - cdef Oid *ctypes - cdef char *const *cvalues - cdef int *clenghts - cdef int *cformats - cnparams, ctypes, cvalues, clengths, cformats = _query_params_args( - param_values, None, param_formats) - - cdef impl.PGresult *rv = impl.PQexecPrepared( - self.pgconn_ptr, name, cnparams, - cvalues, - clengths, cformats, result_format) - - _clear_query_params(ctypes, cvalues, clengths, cformats) - if rv is NULL: - raise MemoryError("couldn't allocate PGresult") - return PGresult._from_ptr(rv) - - def describe_prepared(self, name: bytes) -> PGresult: - self._ensure_pgconn() - cdef impl.PGresult *rv = impl.PQdescribePrepared(self.pgconn_ptr, name) - if rv is NULL: - raise MemoryError("couldn't allocate PGresult") - return PGresult._from_ptr(rv) - - def describe_portal(self, name: bytes) -> PGresult: - self._ensure_pgconn() - cdef impl.PGresult *rv = impl.PQdescribePortal(self.pgconn_ptr, name) - if rv is NULL: - raise MemoryError("couldn't allocate PGresult") - return PGresult._from_ptr(rv) - - def get_result(self) -> Optional["PGresult"]: - cdef impl.PGresult *pgresult = impl.PQgetResult(self.pgconn_ptr) - if pgresult is NULL: - return None - return PGresult._from_ptr(pgresult) - - def consume_input(self) -> None: - if 1 != impl.PQconsumeInput(self.pgconn_ptr): - raise PQerror(f"consuming input failed: {error_message(self)}") - - def is_busy(self) -> int: - cdef int rv - with nogil: - rv = impl.PQisBusy(self.pgconn_ptr) - return rv - - @property - def nonblocking(self) -> int: - return impl.PQisnonblocking(self.pgconn_ptr) - - @nonblocking.setter - def nonblocking(self, arg: int) -> None: - if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg): - raise PQerror(f"setting nonblocking failed: {error_message(self)}") - - def flush(self) -> int: - cdef int rv = impl.PQflush(self.pgconn_ptr) - if rv < 0: - raise PQerror(f"flushing failed:{error_message(self)}") - return rv - - def get_cancel(self) -> PGcancel: - cdef impl.PGcancel *ptr = impl.PQgetCancel(self.pgconn_ptr) - if not ptr: - raise PQerror("couldn't create cancel object") - return PGcancel._from_ptr(ptr) - - def notifies(self) -> Optional[PGnotify]: - cdef impl.PGnotify *ptr - with nogil: - ptr = impl.PQnotifies(self.pgconn_ptr) - - if ptr: - ret = PGnotify(ptr.relname, ptr.be_pid, ptr.extra) - impl.PQfreemem(ptr) - return ret - else: - return None - - def put_copy_data(self, buffer: bytes) -> int: - cdef int rv - cdef const char *cbuffer = PyBytes_AsString(buffer) - cdef int length = len(buffer) - rv = impl.PQputCopyData(self.pgconn_ptr, cbuffer, length) - if rv < 0: - raise PQerror(f"sending copy data failed: {error_message(self)}") - return rv - - def put_copy_end(self, error: Optional[bytes] = None) -> int: - cdef int rv - cdef const char *cerr = NULL - if error is not None: - cerr = PyBytes_AsString(error) - rv = impl.PQputCopyEnd(self.pgconn_ptr, cerr) - if rv < 0: - raise PQerror(f"sending copy end failed: {error_message(self)}") - return rv - - def get_copy_data(self, async_: int) -> Tuple[int, bytes]: - cdef char *buffer_ptr = NULL - cdef int nbytes - nbytes = impl.PQgetCopyData(self.pgconn_ptr, &buffer_ptr, async_) - if nbytes == -2: - raise PQerror(f"receiving copy data failed: {error_message(self)}") - if buffer_ptr is not NULL: - # TODO: do it without copy - data = buffer_ptr[:nbytes] - impl.PQfreemem(buffer_ptr) - return nbytes, data - else: - return nbytes, b"" - - def make_empty_result(self, exec_status: ExecStatus) -> PGresult: - cdef impl.PGresult *rv = impl.PQmakeEmptyPGresult( - self.pgconn_ptr, exec_status) - if not rv: - raise MemoryError("couldn't allocate empty PGresult") - return PGresult._from_ptr(rv) - - cdef int _ensure_pgconn(self) except 0: - if self.pgconn_ptr is not NULL: - return 1 - - raise PQerror("the connection is closed") - - cdef char *_call_bytes(self, conn_bytes_f func) except NULL: - """ - Call one of the pgconn libpq functions returning a bytes pointer. - """ - if not self._ensure_pgconn(): - return NULL - cdef char *rv = func(self.pgconn_ptr) - assert rv is not NULL - return rv - - cdef int _call_int(self, conn_int_f func) except -1: - """ - Call one of the pgconn libpq functions returning an int. - """ - if not self._ensure_pgconn(): - return -1 - return func(self.pgconn_ptr) - - -cdef PGconn _connect(const char *conninfo): - cdef impl.PGconn* pgconn = impl.PQconnectdb(conninfo) - if not pgconn: - raise MemoryError("couldn't allocate PGconn") - - return PGconn._from_ptr(pgconn) - - -cdef PGconn _connect_start(const char *conninfo): - cdef impl.PGconn* pgconn = impl.PQconnectStart(conninfo) - if not pgconn: - raise MemoryError("couldn't allocate PGconn") - - return PGconn._from_ptr(pgconn) +include "pq/pgconn.pyx" cdef (int, Oid *, char * const*, int *, int *) _query_params_args(