--- /dev/null
+"""
+Public interface for the psycopg3.pq module
+
+This is provided as a stub because the implementation can be different.
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from typing import Any, List, Optional, Sequence
+
+from .enums import (
+ ConnStatus,
+ PollingStatus,
+ ExecStatus,
+ TransactionStatus,
+ Ping,
+ DiagnosticField,
+ Format,
+)
+from .misc import error_message, ConninfoOption
+from .encodings import py_codecs
+from ..errors import OperationalError
+
+def version() -> int: ...
+
+class PQerror(OperationalError): ...
+
+class PGconn:
+ def __init__(self, pgconn_ptr: Any): ...
+ def __del__(self) -> None: ...
+ @classmethod
+ def connect(cls, conninfo: bytes) -> "PGconn": ...
+ @classmethod
+ def connect_start(cls, conninfo: bytes) -> "PGconn": ...
+ def connect_poll(self) -> PollingStatus: ...
+ def finish(self) -> None: ...
+ @property
+ def info(self) -> List["ConninfoOption"]: ...
+ def reset(self) -> None: ...
+ def reset_start(self) -> None: ...
+ def reset_poll(self) -> PollingStatus: ...
+ @classmethod
+ def ping(self, conninfo: bytes) -> Ping: ...
+ @property
+ def db(self) -> bytes: ...
+ @property
+ def user(self) -> bytes: ...
+ @property
+ def password(self) -> bytes: ...
+ @property
+ def host(self) -> bytes: ...
+ @property
+ def hostaddr(self) -> bytes: ...
+ @property
+ def port(self) -> bytes: ...
+ @property
+ def tty(self) -> bytes: ...
+ @property
+ def options(self) -> bytes: ...
+ @property
+ def status(self) -> ConnStatus: ...
+ @property
+ def transaction_status(self) -> TransactionStatus: ...
+ def parameter_status(self, name: bytes) -> Optional[bytes]: ...
+ @property
+ def error_message(self) -> bytes: ...
+ @property
+ def protocol_version(self) -> int: ...
+ @property
+ def server_version(self) -> int: ...
+ @property
+ def socket(self) -> int: ...
+ @property
+ def backend_pid(self) -> int: ...
+ @property
+ def needs_password(self) -> bool: ...
+ @property
+ def used_password(self) -> bool: ...
+ @property
+ def ssl_in_use(self) -> bool: ...
+ def exec_(self, command: bytes) -> "PGresult": ...
+ def send_query(self, command: bytes) -> None: ...
+ def exec_params(
+ self,
+ command: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_types: Optional[Sequence[int]] = None,
+ param_formats: Optional[Sequence[Format]] = None,
+ result_format: Format = Format.TEXT,
+ ) -> "PGresult": ...
+ def send_query_params(
+ self,
+ command: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_types: Optional[Sequence[int]] = None,
+ param_formats: Optional[Sequence[Format]] = None,
+ result_format: Format = Format.TEXT,
+ ) -> None: ...
+ def send_prepare(
+ self,
+ name: bytes,
+ command: bytes,
+ param_types: Optional[Sequence[int]] = None,
+ ) -> None: ...
+ def send_query_prepared(
+ self,
+ name: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_formats: Optional[Sequence[Format]] = None,
+ result_format: Format = Format.TEXT,
+ ) -> None: ...
+ def prepare(
+ self,
+ name: bytes,
+ command: bytes,
+ param_types: Optional[Sequence[int]] = None,
+ ) -> "PGresult": ...
+ def exec_prepared(
+ self,
+ name: bytes,
+ param_values: Optional[Sequence[bytes]],
+ param_formats: Optional[Sequence[int]] = None,
+ result_format: int = 0,
+ ) -> "PGresult": ...
+ def describe_prepared(self, name: bytes) -> "PGresult": ...
+ def describe_portal(self, name: bytes) -> "PGresult": ...
+ def get_result(self) -> Optional["PGresult"]: ...
+ def consume_input(self) -> None: ...
+ def is_busy(self) -> int: ...
+ @property
+ def nonblocking(self) -> int: ...
+ @nonblocking.setter
+ def nonblocking(self, arg: int) -> None: ...
+ def flush(self) -> int: ...
+ def make_empty_result(self, exec_status: ExecStatus) -> "PGresult": ...
+
+class PGresult:
+ def __init__(self, pgresult_ptr: Any): ...
+ def __del__(self) -> None: ...
+ def clear(self) -> None: ...
+ @property
+ def status(self) -> ExecStatus: ...
+ @property
+ def error_message(self) -> bytes: ...
+ def error_field(self, fieldcode: DiagnosticField) -> Optional[bytes]: ...
+ @property
+ def ntuples(self) -> int: ...
+ @property
+ def nfields(self) -> int: ...
+ def fname(self, column_number: int) -> Optional[bytes]: ...
+ def ftable(self, column_number: int) -> int: ...
+ def ftablecol(self, column_number: int) -> int: ...
+ def fformat(self, column_number: int) -> Format: ...
+ def ftype(self, column_number: int) -> int: ...
+ def fmod(self, column_number: int) -> int: ...
+ def fsize(self, column_number: int) -> int: ...
+ @property
+ def binary_tuples(self) -> Format: ...
+ def get_value(
+ self, row_number: int, column_number: int
+ ) -> Optional[bytes]: ...
+ @property
+ def nparams(self) -> int: ...
+ def param_type(self, param_number: int) -> int: ...
+ @property
+ def command_status(self) -> Optional[bytes]: ...
+ @property
+ def command_tuples(self) -> Optional[int]: ...
+ @property
+ def oid_value(self) -> int: ...
+
+class Conninfo:
+ @classmethod
+ def get_defaults(cls) -> List[ConninfoOption]: ...
+ @classmethod
+ def parse(cls, conninfo: bytes) -> List[ConninfoOption]: ...
+ @classmethod
+ def _options_from_array(
+ cls, opts: Sequence[Any]
+ ) -> List[ConninfoOption]: ...
+
+class Escaping:
+ def __init__(self, conn: Optional[PGconn] = None): ...
+ def escape_bytea(self, data: bytes) -> bytes: ...
+ def unescape_bytea(self, data: bytes) -> bytes: ...
+
+__all__ = (
+ "ConnStatus",
+ "PollingStatus",
+ "TransactionStatus",
+ "ExecStatus",
+ "Ping",
+ "DiagnosticField",
+ "Format",
+ "PGconn",
+ "Conninfo",
+ "PQerror",
+ "error_message",
+ "ConninfoOption",
+ "py_codecs",
+ "version",
+)
from ctypes import Array, pointer, string_at
from ctypes import c_char_p, c_int, c_size_t, c_ulong
from typing import Any, Callable, List, Optional, Sequence
-from typing import cast as t_cast
+from typing import cast as t_cast, TYPE_CHECKING
from .enums import (
ConnStatus,
from . import _pq_ctypes as impl
from ..errors import OperationalError
+if TYPE_CHECKING:
+ from psycopg3 import pq # noqa
+
def version() -> int:
return impl.PQlibVersion()
raise TypeError(f"bytes expected, got {type(command)} instead")
self._ensure_pgconn()
if not impl.PQsendQuery(self.pgconn_ptr, command):
- raise PQerror(f"sending query failed: {error_message(self)}")
+ raise PQerror(
+ "sending query failed:"
+ f" {error_message(t_cast('pq.PGconn', self))}"
+ )
def exec_params(
self,
self._ensure_pgconn()
if not impl.PQsendQueryParams(*args):
raise PQerror(
- f"sending query and params failed: {error_message(self)}"
+ "sending query and params failed:"
+ f" {error_message(t_cast('pq.PGconn', self))}"
)
def send_prepare(
self.pgconn_ptr, name, command, nparams, atypes
):
raise PQerror(
- f"sending query and params failed: {error_message(self)}"
+ "sending query and params failed:"
+ f" {error_message(t_cast('pq.PGconn', self))}"
)
def send_query_prepared(
self._ensure_pgconn()
if not impl.PQsendQueryPrepared(*args):
raise PQerror(
- f"sending prepared query failed: {error_message(self)}"
+ "sending prepared query failed:"
+ f" {error_message(t_cast('pq.PGconn', self))}"
)
def _query_params_args(
def consume_input(self) -> None:
if 1 != impl.PQconsumeInput(self.pgconn_ptr):
- raise PQerror(f"consuming input failed: {error_message(self)}")
+ raise PQerror(
+ "consuming input failed:"
+ f" {error_message(t_cast('pq.PGconn', self))}"
+ )
def is_busy(self) -> int:
return impl.PQisBusy(self.pgconn_ptr)
@nonblocking.setter
def nonblocking(self, arg: int) -> None:
if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg):
- raise PQerror(f"setting nonblocking failed: {error_message(self)}")
+ raise PQerror(
+ f"setting nonblocking failed:"
+ f" {error_message(t_cast('pq.PGconn', self))}"
+ )
def flush(self) -> int:
rv: int = impl.PQflush(self.pgconn_ptr)
if rv < 0:
- raise PQerror(f"flushing failed: {error_message(self)}")
+ raise PQerror(
+ f"flushing failed:{error_message(t_cast('pq.PGconn', self))}"
+ )
return rv
def make_empty_result(self, exec_status: ExecStatus) -> "PGresult":