From: Daniele Varrazzo Date: Sun, 12 Apr 2020 13:52:30 +0000 (+1200) Subject: Added stub for the pq package X-Git-Tag: 3.0.dev0~564 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=379e9b790e43e49bb08a4e2d053e071a10b428f4;p=thirdparty%2Fpsycopg.git Added stub for the pq package Needed as the dynamic import of the modle confuses mypy --- diff --git a/psycopg3/pq/__init__.py b/psycopg3/pq/__init__.py index 70223a342..c7816b925 100644 --- a/psycopg3/pq/__init__.py +++ b/psycopg3/pq/__init__.py @@ -9,6 +9,8 @@ implementation-dependant but all the implementations share the same interface. # Copyright (C) 2020 The Psycopg Team +from types import ModuleType + from .enums import ( ConnStatus, PollingStatus, @@ -21,7 +23,17 @@ from .enums import ( from .encodings import py_codecs from .misc import error_message, ConninfoOption -from . import pq_ctypes as pq_module + +def import_libpq() -> ModuleType: + """ + Find the best libpw wrapper available. + """ + from . import pq_ctypes + + return pq_ctypes + + +pq_module = import_libpq() version = pq_module.version PGconn = pq_module.PGconn diff --git a/psycopg3/pq/__init__.pyi b/psycopg3/pq/__init__.pyi new file mode 100644 index 000000000..008a0ac0d --- /dev/null +++ b/psycopg3/pq/__init__.pyi @@ -0,0 +1,202 @@ +""" +Public interface for the psycopg3.pq module + +This is provided as a stub because the implementation can be different. +""" + +# Copyright (C) 2020 The Psycopg Team + +from typing import Any, List, Optional, Sequence + +from .enums import ( + ConnStatus, + PollingStatus, + ExecStatus, + TransactionStatus, + Ping, + DiagnosticField, + Format, +) +from .misc import error_message, ConninfoOption +from .encodings import py_codecs +from ..errors import OperationalError + +def version() -> int: ... + +class PQerror(OperationalError): ... + +class PGconn: + def __init__(self, pgconn_ptr: Any): ... + def __del__(self) -> None: ... + @classmethod + def connect(cls, conninfo: bytes) -> "PGconn": ... + @classmethod + def connect_start(cls, conninfo: bytes) -> "PGconn": ... + def connect_poll(self) -> PollingStatus: ... + def finish(self) -> None: ... + @property + def info(self) -> List["ConninfoOption"]: ... + def reset(self) -> None: ... + def reset_start(self) -> None: ... + def reset_poll(self) -> PollingStatus: ... + @classmethod + def ping(self, conninfo: bytes) -> Ping: ... + @property + def db(self) -> bytes: ... + @property + def user(self) -> bytes: ... + @property + def password(self) -> bytes: ... + @property + def host(self) -> bytes: ... + @property + def hostaddr(self) -> bytes: ... + @property + def port(self) -> bytes: ... + @property + def tty(self) -> bytes: ... + @property + def options(self) -> bytes: ... + @property + def status(self) -> ConnStatus: ... + @property + def transaction_status(self) -> TransactionStatus: ... + def parameter_status(self, name: bytes) -> Optional[bytes]: ... + @property + def error_message(self) -> bytes: ... + @property + def protocol_version(self) -> int: ... + @property + def server_version(self) -> int: ... + @property + def socket(self) -> int: ... + @property + def backend_pid(self) -> int: ... + @property + def needs_password(self) -> bool: ... + @property + def used_password(self) -> bool: ... + @property + def ssl_in_use(self) -> bool: ... + def exec_(self, command: bytes) -> "PGresult": ... + def send_query(self, command: bytes) -> None: ... + def exec_params( + self, + command: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_types: Optional[Sequence[int]] = None, + param_formats: Optional[Sequence[Format]] = None, + result_format: Format = Format.TEXT, + ) -> "PGresult": ... + def send_query_params( + self, + command: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_types: Optional[Sequence[int]] = None, + param_formats: Optional[Sequence[Format]] = None, + result_format: Format = Format.TEXT, + ) -> None: ... + def send_prepare( + self, + name: bytes, + command: bytes, + param_types: Optional[Sequence[int]] = None, + ) -> None: ... + def send_query_prepared( + self, + name: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_formats: Optional[Sequence[Format]] = None, + result_format: Format = Format.TEXT, + ) -> None: ... + def prepare( + self, + name: bytes, + command: bytes, + param_types: Optional[Sequence[int]] = None, + ) -> "PGresult": ... + def exec_prepared( + self, + name: bytes, + param_values: Optional[Sequence[bytes]], + param_formats: Optional[Sequence[int]] = None, + result_format: int = 0, + ) -> "PGresult": ... + def describe_prepared(self, name: bytes) -> "PGresult": ... + def describe_portal(self, name: bytes) -> "PGresult": ... + def get_result(self) -> Optional["PGresult"]: ... + def consume_input(self) -> None: ... + def is_busy(self) -> int: ... + @property + def nonblocking(self) -> int: ... + @nonblocking.setter + def nonblocking(self, arg: int) -> None: ... + def flush(self) -> int: ... + def make_empty_result(self, exec_status: ExecStatus) -> "PGresult": ... + +class PGresult: + def __init__(self, pgresult_ptr: Any): ... + def __del__(self) -> None: ... + def clear(self) -> None: ... + @property + def status(self) -> ExecStatus: ... + @property + def error_message(self) -> bytes: ... + def error_field(self, fieldcode: DiagnosticField) -> Optional[bytes]: ... + @property + def ntuples(self) -> int: ... + @property + def nfields(self) -> int: ... + def fname(self, column_number: int) -> Optional[bytes]: ... + def ftable(self, column_number: int) -> int: ... + def ftablecol(self, column_number: int) -> int: ... + def fformat(self, column_number: int) -> Format: ... + def ftype(self, column_number: int) -> int: ... + def fmod(self, column_number: int) -> int: ... + def fsize(self, column_number: int) -> int: ... + @property + def binary_tuples(self) -> Format: ... + def get_value( + self, row_number: int, column_number: int + ) -> Optional[bytes]: ... + @property + def nparams(self) -> int: ... + def param_type(self, param_number: int) -> int: ... + @property + def command_status(self) -> Optional[bytes]: ... + @property + def command_tuples(self) -> Optional[int]: ... + @property + def oid_value(self) -> int: ... + +class Conninfo: + @classmethod + def get_defaults(cls) -> List[ConninfoOption]: ... + @classmethod + def parse(cls, conninfo: bytes) -> List[ConninfoOption]: ... + @classmethod + def _options_from_array( + cls, opts: Sequence[Any] + ) -> List[ConninfoOption]: ... + +class Escaping: + def __init__(self, conn: Optional[PGconn] = None): ... + def escape_bytea(self, data: bytes) -> bytes: ... + def unescape_bytea(self, data: bytes) -> bytes: ... + +__all__ = ( + "ConnStatus", + "PollingStatus", + "TransactionStatus", + "ExecStatus", + "Ping", + "DiagnosticField", + "Format", + "PGconn", + "Conninfo", + "PQerror", + "error_message", + "ConninfoOption", + "py_codecs", + "version", +) diff --git a/psycopg3/pq/pq_ctypes.py b/psycopg3/pq/pq_ctypes.py index ae0ce8253..35539c720 100644 --- a/psycopg3/pq/pq_ctypes.py +++ b/psycopg3/pq/pq_ctypes.py @@ -11,7 +11,7 @@ implementation. from ctypes import Array, pointer, string_at from ctypes import c_char_p, c_int, c_size_t, c_ulong from typing import Any, Callable, List, Optional, Sequence -from typing import cast as t_cast +from typing import cast as t_cast, TYPE_CHECKING from .enums import ( ConnStatus, @@ -26,6 +26,9 @@ from .misc import error_message, ConninfoOption from . import _pq_ctypes as impl from ..errors import OperationalError +if TYPE_CHECKING: + from psycopg3 import pq # noqa + def version() -> int: return impl.PQlibVersion() @@ -196,7 +199,10 @@ class PGconn: raise TypeError(f"bytes expected, got {type(command)} instead") self._ensure_pgconn() if not impl.PQsendQuery(self.pgconn_ptr, command): - raise PQerror(f"sending query failed: {error_message(self)}") + raise PQerror( + "sending query failed:" + f" {error_message(t_cast('pq.PGconn', self))}" + ) def exec_params( self, @@ -229,7 +235,8 @@ class PGconn: self._ensure_pgconn() if not impl.PQsendQueryParams(*args): raise PQerror( - f"sending query and params failed: {error_message(self)}" + "sending query and params failed:" + f" {error_message(t_cast('pq.PGconn', self))}" ) def send_prepare( @@ -251,7 +258,8 @@ class PGconn: self.pgconn_ptr, name, command, nparams, atypes ): raise PQerror( - f"sending query and params failed: {error_message(self)}" + "sending query and params failed:" + f" {error_message(t_cast('pq.PGconn', self))}" ) def send_query_prepared( @@ -271,7 +279,8 @@ class PGconn: self._ensure_pgconn() if not impl.PQsendQueryPrepared(*args): raise PQerror( - f"sending prepared query failed: {error_message(self)}" + "sending prepared query failed:" + f" {error_message(t_cast('pq.PGconn', self))}" ) def _query_params_args( @@ -420,7 +429,10 @@ class PGconn: def consume_input(self) -> None: if 1 != impl.PQconsumeInput(self.pgconn_ptr): - raise PQerror(f"consuming input failed: {error_message(self)}") + raise PQerror( + "consuming input failed:" + f" {error_message(t_cast('pq.PGconn', self))}" + ) def is_busy(self) -> int: return impl.PQisBusy(self.pgconn_ptr) @@ -432,12 +444,17 @@ class PGconn: @nonblocking.setter def nonblocking(self, arg: int) -> None: if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg): - raise PQerror(f"setting nonblocking failed: {error_message(self)}") + raise PQerror( + f"setting nonblocking failed:" + f" {error_message(t_cast('pq.PGconn', self))}" + ) def flush(self) -> int: rv: int = impl.PQflush(self.pgconn_ptr) if rv < 0: - raise PQerror(f"flushing failed: {error_message(self)}") + raise PQerror( + f"flushing failed:{error_message(t_cast('pq.PGconn', self))}" + ) return rv def make_empty_result(self, exec_status: ExecStatus) -> "PGresult":