]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Added stub for the pq package
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 12 Apr 2020 13:52:30 +0000 (01:52 +1200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 14 Apr 2020 06:50:20 +0000 (18:50 +1200)
Needed as the dynamic import of the modle confuses mypy

psycopg3/pq/__init__.py
psycopg3/pq/__init__.pyi [new file with mode: 0644]
psycopg3/pq/pq_ctypes.py

index 70223a342524dee70220e3e4a6efca1c40b2df24..c7816b9256ebedb7e0da1b7487bda6e876877caa 100644 (file)
@@ -9,6 +9,8 @@ implementation-dependant but all the implementations share the same interface.
 
 # Copyright (C) 2020 The Psycopg Team
 
+from types import ModuleType
+
 from .enums import (
     ConnStatus,
     PollingStatus,
@@ -21,7 +23,17 @@ from .enums import (
 from .encodings import py_codecs
 from .misc import error_message, ConninfoOption
 
-from . import pq_ctypes as pq_module
+
+def import_libpq() -> ModuleType:
+    """
+    Find the best libpw wrapper available.
+    """
+    from . import pq_ctypes
+
+    return pq_ctypes
+
+
+pq_module = import_libpq()
 
 version = pq_module.version
 PGconn = pq_module.PGconn
diff --git a/psycopg3/pq/__init__.pyi b/psycopg3/pq/__init__.pyi
new file mode 100644 (file)
index 0000000..008a0ac
--- /dev/null
@@ -0,0 +1,202 @@
+"""
+Public interface for the psycopg3.pq module
+
+This is provided as a stub because the implementation can be different.
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from typing import Any, List, Optional, Sequence
+
+from .enums import (
+    ConnStatus,
+    PollingStatus,
+    ExecStatus,
+    TransactionStatus,
+    Ping,
+    DiagnosticField,
+    Format,
+)
+from .misc import error_message, ConninfoOption
+from .encodings import py_codecs
+from ..errors import OperationalError
+
+def version() -> int: ...
+
+class PQerror(OperationalError): ...
+
+class PGconn:
+    def __init__(self, pgconn_ptr: Any): ...
+    def __del__(self) -> None: ...
+    @classmethod
+    def connect(cls, conninfo: bytes) -> "PGconn": ...
+    @classmethod
+    def connect_start(cls, conninfo: bytes) -> "PGconn": ...
+    def connect_poll(self) -> PollingStatus: ...
+    def finish(self) -> None: ...
+    @property
+    def info(self) -> List["ConninfoOption"]: ...
+    def reset(self) -> None: ...
+    def reset_start(self) -> None: ...
+    def reset_poll(self) -> PollingStatus: ...
+    @classmethod
+    def ping(self, conninfo: bytes) -> Ping: ...
+    @property
+    def db(self) -> bytes: ...
+    @property
+    def user(self) -> bytes: ...
+    @property
+    def password(self) -> bytes: ...
+    @property
+    def host(self) -> bytes: ...
+    @property
+    def hostaddr(self) -> bytes: ...
+    @property
+    def port(self) -> bytes: ...
+    @property
+    def tty(self) -> bytes: ...
+    @property
+    def options(self) -> bytes: ...
+    @property
+    def status(self) -> ConnStatus: ...
+    @property
+    def transaction_status(self) -> TransactionStatus: ...
+    def parameter_status(self, name: bytes) -> Optional[bytes]: ...
+    @property
+    def error_message(self) -> bytes: ...
+    @property
+    def protocol_version(self) -> int: ...
+    @property
+    def server_version(self) -> int: ...
+    @property
+    def socket(self) -> int: ...
+    @property
+    def backend_pid(self) -> int: ...
+    @property
+    def needs_password(self) -> bool: ...
+    @property
+    def used_password(self) -> bool: ...
+    @property
+    def ssl_in_use(self) -> bool: ...
+    def exec_(self, command: bytes) -> "PGresult": ...
+    def send_query(self, command: bytes) -> None: ...
+    def exec_params(
+        self,
+        command: bytes,
+        param_values: Optional[Sequence[Optional[bytes]]],
+        param_types: Optional[Sequence[int]] = None,
+        param_formats: Optional[Sequence[Format]] = None,
+        result_format: Format = Format.TEXT,
+    ) -> "PGresult": ...
+    def send_query_params(
+        self,
+        command: bytes,
+        param_values: Optional[Sequence[Optional[bytes]]],
+        param_types: Optional[Sequence[int]] = None,
+        param_formats: Optional[Sequence[Format]] = None,
+        result_format: Format = Format.TEXT,
+    ) -> None: ...
+    def send_prepare(
+        self,
+        name: bytes,
+        command: bytes,
+        param_types: Optional[Sequence[int]] = None,
+    ) -> None: ...
+    def send_query_prepared(
+        self,
+        name: bytes,
+        param_values: Optional[Sequence[Optional[bytes]]],
+        param_formats: Optional[Sequence[Format]] = None,
+        result_format: Format = Format.TEXT,
+    ) -> None: ...
+    def prepare(
+        self,
+        name: bytes,
+        command: bytes,
+        param_types: Optional[Sequence[int]] = None,
+    ) -> "PGresult": ...
+    def exec_prepared(
+        self,
+        name: bytes,
+        param_values: Optional[Sequence[bytes]],
+        param_formats: Optional[Sequence[int]] = None,
+        result_format: int = 0,
+    ) -> "PGresult": ...
+    def describe_prepared(self, name: bytes) -> "PGresult": ...
+    def describe_portal(self, name: bytes) -> "PGresult": ...
+    def get_result(self) -> Optional["PGresult"]: ...
+    def consume_input(self) -> None: ...
+    def is_busy(self) -> int: ...
+    @property
+    def nonblocking(self) -> int: ...
+    @nonblocking.setter
+    def nonblocking(self, arg: int) -> None: ...
+    def flush(self) -> int: ...
+    def make_empty_result(self, exec_status: ExecStatus) -> "PGresult": ...
+
+class PGresult:
+    def __init__(self, pgresult_ptr: Any): ...
+    def __del__(self) -> None: ...
+    def clear(self) -> None: ...
+    @property
+    def status(self) -> ExecStatus: ...
+    @property
+    def error_message(self) -> bytes: ...
+    def error_field(self, fieldcode: DiagnosticField) -> Optional[bytes]: ...
+    @property
+    def ntuples(self) -> int: ...
+    @property
+    def nfields(self) -> int: ...
+    def fname(self, column_number: int) -> Optional[bytes]: ...
+    def ftable(self, column_number: int) -> int: ...
+    def ftablecol(self, column_number: int) -> int: ...
+    def fformat(self, column_number: int) -> Format: ...
+    def ftype(self, column_number: int) -> int: ...
+    def fmod(self, column_number: int) -> int: ...
+    def fsize(self, column_number: int) -> int: ...
+    @property
+    def binary_tuples(self) -> Format: ...
+    def get_value(
+        self, row_number: int, column_number: int
+    ) -> Optional[bytes]: ...
+    @property
+    def nparams(self) -> int: ...
+    def param_type(self, param_number: int) -> int: ...
+    @property
+    def command_status(self) -> Optional[bytes]: ...
+    @property
+    def command_tuples(self) -> Optional[int]: ...
+    @property
+    def oid_value(self) -> int: ...
+
+class Conninfo:
+    @classmethod
+    def get_defaults(cls) -> List[ConninfoOption]: ...
+    @classmethod
+    def parse(cls, conninfo: bytes) -> List[ConninfoOption]: ...
+    @classmethod
+    def _options_from_array(
+        cls, opts: Sequence[Any]
+    ) -> List[ConninfoOption]: ...
+
+class Escaping:
+    def __init__(self, conn: Optional[PGconn] = None): ...
+    def escape_bytea(self, data: bytes) -> bytes: ...
+    def unescape_bytea(self, data: bytes) -> bytes: ...
+
+__all__ = (
+    "ConnStatus",
+    "PollingStatus",
+    "TransactionStatus",
+    "ExecStatus",
+    "Ping",
+    "DiagnosticField",
+    "Format",
+    "PGconn",
+    "Conninfo",
+    "PQerror",
+    "error_message",
+    "ConninfoOption",
+    "py_codecs",
+    "version",
+)
index ae0ce825318954330fc3f2e98612bb27b5553352..35539c72035297ed62c0f1563250c9f4ba36338b 100644 (file)
@@ -11,7 +11,7 @@ implementation.
 from ctypes import Array, pointer, string_at
 from ctypes import c_char_p, c_int, c_size_t, c_ulong
 from typing import Any, Callable, List, Optional, Sequence
-from typing import cast as t_cast
+from typing import cast as t_cast, TYPE_CHECKING
 
 from .enums import (
     ConnStatus,
@@ -26,6 +26,9 @@ from .misc import error_message, ConninfoOption
 from . import _pq_ctypes as impl
 from ..errors import OperationalError
 
+if TYPE_CHECKING:
+    from psycopg3 import pq  # noqa
+
 
 def version() -> int:
     return impl.PQlibVersion()
@@ -196,7 +199,10 @@ class PGconn:
             raise TypeError(f"bytes expected, got {type(command)} instead")
         self._ensure_pgconn()
         if not impl.PQsendQuery(self.pgconn_ptr, command):
-            raise PQerror(f"sending query failed: {error_message(self)}")
+            raise PQerror(
+                "sending query failed:"
+                f" {error_message(t_cast('pq.PGconn', self))}"
+            )
 
     def exec_params(
         self,
@@ -229,7 +235,8 @@ class PGconn:
         self._ensure_pgconn()
         if not impl.PQsendQueryParams(*args):
             raise PQerror(
-                f"sending query and params failed: {error_message(self)}"
+                "sending query and params failed:"
+                f" {error_message(t_cast('pq.PGconn', self))}"
             )
 
     def send_prepare(
@@ -251,7 +258,8 @@ class PGconn:
             self.pgconn_ptr, name, command, nparams, atypes
         ):
             raise PQerror(
-                f"sending query and params failed: {error_message(self)}"
+                "sending query and params failed:"
+                f" {error_message(t_cast('pq.PGconn', self))}"
             )
 
     def send_query_prepared(
@@ -271,7 +279,8 @@ class PGconn:
         self._ensure_pgconn()
         if not impl.PQsendQueryPrepared(*args):
             raise PQerror(
-                f"sending prepared query failed: {error_message(self)}"
+                "sending prepared query failed:"
+                f" {error_message(t_cast('pq.PGconn', self))}"
             )
 
     def _query_params_args(
@@ -420,7 +429,10 @@ class PGconn:
 
     def consume_input(self) -> None:
         if 1 != impl.PQconsumeInput(self.pgconn_ptr):
-            raise PQerror(f"consuming input failed: {error_message(self)}")
+            raise PQerror(
+                "consuming input failed:"
+                f" {error_message(t_cast('pq.PGconn', self))}"
+            )
 
     def is_busy(self) -> int:
         return impl.PQisBusy(self.pgconn_ptr)
@@ -432,12 +444,17 @@ class PGconn:
     @nonblocking.setter
     def nonblocking(self, arg: int) -> None:
         if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg):
-            raise PQerror(f"setting nonblocking failed: {error_message(self)}")
+            raise PQerror(
+                f"setting nonblocking failed:"
+                f" {error_message(t_cast('pq.PGconn', self))}"
+            )
 
     def flush(self) -> int:
         rv: int = impl.PQflush(self.pgconn_ptr)
         if rv < 0:
-            raise PQerror(f"flushing failed: {error_message(self)}")
+            raise PQerror(
+                f"flushing failed:{error_message(t_cast('pq.PGconn', self))}"
+            )
         return rv
 
     def make_empty_result(self, exec_status: ExecStatus) -> "PGresult":