self._loaders_maps.append(Loader.globals)
@property
- def pgresult(self) -> Optional[pq.PGresult]:
+ def pgresult(self) -> Optional[pq.proto.PGresult]:
return self._pgresult
@pgresult.setter
- def pgresult(self, result: Optional[pq.PGresult]) -> None:
+ def pgresult(self, result: Optional[pq.proto.PGresult]) -> None:
self._pgresult = result
rc = self._row_loaders = []
ConnStatus = pq.ConnStatus
TransactionStatus = pq.TransactionStatus
- def __init__(self, pgconn: pq.PGconn):
+ def __init__(self, pgconn: pq.proto.PGconn):
self.pgconn = pgconn
self.cursor_factory = cursor.BaseCursor
self.dumpers: DumpersMap = {}
cursor_factory: Type[cursor.Cursor]
- def __init__(self, pgconn: pq.PGconn):
+ def __init__(self, pgconn: pq.proto.PGconn):
super().__init__(pgconn)
self.lock = threading.Lock()
self.cursor_factory = cursor.Cursor
cursor_factory: Type[cursor.AsyncCursor]
- def __init__(self, pgconn: pq.PGconn):
+ def __init__(self, pgconn: pq.proto.PGconn):
super().__init__(pgconn)
self.lock = asyncio.Lock()
self.cursor_factory = cursor.AsyncCursor
class Column(Sequence[Any]):
def __init__(
- self, pgresult: pq.PGresult, index: int, codec: codecs.CodecInfo
+ self, pgresult: pq.proto.PGresult, index: int, codec: codecs.CodecInfo
):
self._pgresult = pgresult
self._index = index
self._closed = False
def _reset(self) -> None:
- self._results: List[pq.PGresult] = []
+ self._results: List[pq.proto.PGresult] = []
self.pgresult = None
self._pos = 0
self._iresult = 0
return None
@property
- def pgresult(self) -> Optional[pq.PGresult]:
+ def pgresult(self) -> Optional[pq.proto.PGresult]:
return self._pgresult
@pgresult.setter
- def pgresult(self, result: Optional[pq.PGresult]) -> None:
+ def pgresult(self, result: Optional[pq.proto.PGresult]) -> None:
self._pgresult = result
if result is not None:
if self._transformer is not None:
else:
self.connection.pgconn.send_query(pgq.query)
- def _execute_results(self, results: Sequence[pq.PGresult]) -> None:
+ def _execute_results(self, results: Sequence[pq.proto.PGresult]) -> None:
"""
Implement part of execute() after waiting common to sync and async
"""
# Copyright (C) 2020 The Psycopg Team
-from typing import Any, Optional, Sequence, Type, TYPE_CHECKING
-
-if TYPE_CHECKING:
- from psycopg3.pq import PGresult # noqa
+from typing import Any, Optional, Sequence, Type
+from psycopg3.pq.proto import PGresult
class Warning(Exception):
"""
def __init__(
- self, *args: Sequence[Any], pgresult: Optional["PGresult"] = None
+ self, *args: Sequence[Any], pgresult: Optional[PGresult] = None
):
super().__init__(*args)
self.pgresult = pgresult
return DatabaseError
-def error_from_result(result: "PGresult") -> Error:
+def error_from_result(result: PGresult) -> Error:
from psycopg3 import pq
state = result.error_field(pq.DiagnosticField.SQLSTATE) or b""
logger = logging.getLogger(__name__)
-def connect(conninfo: str) -> PQGen[pq.PGconn]:
+def connect(conninfo: str) -> PQGen[pq.proto.PGconn]:
"""
Generator to create a database connection without blocking.
return conn
-def execute(pgconn: pq.PGconn) -> PQGen[List[pq.PGresult]]:
+def execute(pgconn: pq.proto.PGconn) -> PQGen[List[pq.proto.PGresult]]:
"""
Generator sending a query and returning results without blocking.
return rv
-def send(pgconn: pq.PGconn) -> PQGen[None]:
+def send(pgconn: pq.proto.PGconn) -> PQGen[None]:
"""
Generator to send a query to the server without blocking.
continue
-def fetch(pgconn: pq.PGconn) -> PQGen[List[pq.PGresult]]:
+def fetch(pgconn: pq.proto.PGconn) -> PQGen[List[pq.proto.PGresult]]:
"""
Generator retrieving results from the database without blocking.
or error).
"""
S = pq.ExecStatus
- results: List[pq.PGresult] = []
+ results: List[pq.proto.PGresult] = []
while 1:
pgconn.consume_input()
if pgconn.is_busy():
import os
import logging
-from types import ModuleType
+from typing import Callable, Type
from .enums import (
ConnStatus,
Format,
)
from .encodings import py_codecs
-from .misc import error_message, ConninfoOption
+from .misc import error_message, ConninfoOption, PQerror
+from . import proto
logger = logging.getLogger(__name__)
+__impl__: str
+version: Callable[[], int]
+PGconn: Type[proto.PGconn]
+PGresult: Type[proto.PGresult]
+Conninfo: Type[proto.Conninfo]
+Escaping: Type[proto.Escaping]
-def import_libpq() -> ModuleType:
+
+def import_from_libpq() -> None:
"""
- Find the best libpw wrapper available.
+ Import pq objects implementation from the best libpw wrapper available.
"""
+ global __impl__, version, PGconn, PGresult, Conninfo, Escaping
+
impl = os.environ.get("PSYCOPG3_IMPL", "").lower()
if not impl or impl == "c":
try:
- from . import pq_cython
+ # TODO: extension module not recognised by mypy?
+ from . import pq_cython # type: ignore
except Exception as e:
if not impl:
logger.debug(f"C pq wrapper not available: %s", e)
f"requested pq implementation '{impl}' not available"
) from e
else:
- return pq_cython
+ __impl__ = pq_cython.__impl__
+ version = pq_cython.version
+ PGconn = pq_cython.PGconn
+ PGresult = pq_cython.PGresult
+ Conninfo = pq_cython.Conninfo
+ Escaping = pq_cython.Escaping
+ return
if not impl or impl == "ctypes":
try:
f"requested pq implementation '{impl}' not available"
) from e
else:
- return pq_ctypes
+ __impl__ = pq_ctypes.__impl__
+ version = pq_ctypes.version
+ PGconn = pq_ctypes.PGconn
+ PGresult = pq_ctypes.PGresult
+ Conninfo = pq_ctypes.Conninfo
+ Escaping = pq_ctypes.Escaping
+ return
if impl:
raise ImportError(f"requested pq impementation '{impl}' unknown")
raise ImportError(f"no pq wrapper available")
-pq_module = import_libpq()
-
-__impl__ = pq_module.__impl__
-version = pq_module.version
-PGconn = pq_module.PGconn
-PGresult = pq_module.PGresult
-PQerror = pq_module.PQerror
-Conninfo = pq_module.Conninfo
-Escaping = pq_module.Escaping
+import_from_libpq()
__all__ = (
"ConnStatus",
+++ /dev/null
-"""
-Public interface for the psycopg3.pq module
-
-This is provided as a stub because the implementation can be different.
-"""
-
-# Copyright (C) 2020 The Psycopg Team
-
-from typing import Any, List, Optional, Sequence
-
-from .enums import (
- ConnStatus,
- PollingStatus,
- ExecStatus,
- TransactionStatus,
- Ping,
- DiagnosticField,
- Format,
-)
-from .misc import error_message, ConninfoOption
-from .encodings import py_codecs
-from ..errors import OperationalError
-
-def version() -> int: ...
-
-class PQerror(OperationalError): ...
-
-class PGconn:
- def __init__(self, pgconn_ptr: Any): ...
- def __del__(self) -> None: ...
- @classmethod
- def connect(cls, conninfo: bytes) -> "PGconn": ...
- @classmethod
- def connect_start(cls, conninfo: bytes) -> "PGconn": ...
- def connect_poll(self) -> PollingStatus: ...
- def finish(self) -> None: ...
- @property
- def info(self) -> List["ConninfoOption"]: ...
- def reset(self) -> None: ...
- def reset_start(self) -> None: ...
- def reset_poll(self) -> PollingStatus: ...
- @classmethod
- def ping(self, conninfo: bytes) -> Ping: ...
- @property
- def db(self) -> bytes: ...
- @property
- def user(self) -> bytes: ...
- @property
- def password(self) -> bytes: ...
- @property
- def host(self) -> bytes: ...
- @property
- def hostaddr(self) -> bytes: ...
- @property
- def port(self) -> bytes: ...
- @property
- def tty(self) -> bytes: ...
- @property
- def options(self) -> bytes: ...
- @property
- def status(self) -> ConnStatus: ...
- @property
- def transaction_status(self) -> TransactionStatus: ...
- def parameter_status(self, name: bytes) -> Optional[bytes]: ...
- @property
- def error_message(self) -> bytes: ...
- @property
- def protocol_version(self) -> int: ...
- @property
- def server_version(self) -> int: ...
- @property
- def socket(self) -> int: ...
- @property
- def backend_pid(self) -> int: ...
- @property
- def needs_password(self) -> bool: ...
- @property
- def used_password(self) -> bool: ...
- @property
- def ssl_in_use(self) -> bool: ...
- def exec_(self, command: bytes) -> "PGresult": ...
- def send_query(self, command: bytes) -> None: ...
- def exec_params(
- self,
- command: bytes,
- param_values: Optional[Sequence[Optional[bytes]]],
- param_types: Optional[Sequence[int]] = None,
- param_formats: Optional[Sequence[Format]] = None,
- result_format: Format = Format.TEXT,
- ) -> "PGresult": ...
- def send_query_params(
- self,
- command: bytes,
- param_values: Optional[Sequence[Optional[bytes]]],
- param_types: Optional[Sequence[int]] = None,
- param_formats: Optional[Sequence[Format]] = None,
- result_format: Format = Format.TEXT,
- ) -> None: ...
- def send_prepare(
- self,
- name: bytes,
- command: bytes,
- param_types: Optional[Sequence[int]] = None,
- ) -> None: ...
- def send_query_prepared(
- self,
- name: bytes,
- param_values: Optional[Sequence[Optional[bytes]]],
- param_formats: Optional[Sequence[Format]] = None,
- result_format: Format = Format.TEXT,
- ) -> None: ...
- def prepare(
- self,
- name: bytes,
- command: bytes,
- param_types: Optional[Sequence[int]] = None,
- ) -> "PGresult": ...
- def exec_prepared(
- self,
- name: bytes,
- param_values: Optional[Sequence[bytes]],
- param_formats: Optional[Sequence[int]] = None,
- result_format: int = 0,
- ) -> "PGresult": ...
- def describe_prepared(self, name: bytes) -> "PGresult": ...
- def describe_portal(self, name: bytes) -> "PGresult": ...
- def get_result(self) -> Optional["PGresult"]: ...
- def consume_input(self) -> None: ...
- def is_busy(self) -> int: ...
- @property
- def nonblocking(self) -> int: ...
- @nonblocking.setter
- def nonblocking(self, arg: int) -> None: ...
- def flush(self) -> int: ...
- def make_empty_result(self, exec_status: ExecStatus) -> "PGresult": ...
-
-class PGresult:
- def __init__(self, pgresult_ptr: Any): ...
- def __del__(self) -> None: ...
- def clear(self) -> None: ...
- @property
- def status(self) -> ExecStatus: ...
- @property
- def error_message(self) -> bytes: ...
- def error_field(self, fieldcode: DiagnosticField) -> Optional[bytes]: ...
- @property
- def ntuples(self) -> int: ...
- @property
- def nfields(self) -> int: ...
- def fname(self, column_number: int) -> Optional[bytes]: ...
- def ftable(self, column_number: int) -> int: ...
- def ftablecol(self, column_number: int) -> int: ...
- def fformat(self, column_number: int) -> Format: ...
- def ftype(self, column_number: int) -> int: ...
- def fmod(self, column_number: int) -> int: ...
- def fsize(self, column_number: int) -> int: ...
- @property
- def binary_tuples(self) -> Format: ...
- def get_value(
- self, row_number: int, column_number: int
- ) -> Optional[bytes]: ...
- @property
- def nparams(self) -> int: ...
- def param_type(self, param_number: int) -> int: ...
- @property
- def command_status(self) -> Optional[bytes]: ...
- @property
- def command_tuples(self) -> Optional[int]: ...
- @property
- def oid_value(self) -> int: ...
-
-class Conninfo:
- @classmethod
- def get_defaults(cls) -> List[ConninfoOption]: ...
- @classmethod
- def parse(cls, conninfo: bytes) -> List[ConninfoOption]: ...
- @classmethod
- def _options_from_array(
- cls, opts: Sequence[Any]
- ) -> List[ConninfoOption]: ...
-
-class Escaping:
- def __init__(self, conn: Optional[PGconn] = None): ...
- def escape_bytea(self, data: bytes) -> bytes: ...
- def unescape_bytea(self, data: bytes) -> bytes: ...
-
-__all__ = (
- "ConnStatus",
- "PollingStatus",
- "TransactionStatus",
- "ExecStatus",
- "Ping",
- "DiagnosticField",
- "Format",
- "PGconn",
- "Conninfo",
- "PQerror",
- "error_message",
- "ConninfoOption",
- "py_codecs",
- "version",
-)
# Copyright (C) 2020 The Psycopg Team
from collections import namedtuple
-from typing import TYPE_CHECKING, Union
+from typing import cast, Union
-if TYPE_CHECKING:
- from psycopg3.pq import PGconn, PGresult # noqa
+from ..errors import OperationalError
+from .enums import DiagnosticField
+from .proto import PGconn, PGresult
+
+
+class PQerror(OperationalError):
+ pass
ConninfoOption = namedtuple(
)
-def error_message(obj: Union["PGconn", "PGresult"]) -> str:
+def error_message(obj: Union[PGconn, PGresult]) -> str:
"""
Return an error message from a PGconn or PGresult.
The return value is a str (unlike pq data which is usually bytes).
"""
- from psycopg3 import pq
-
bmsg: bytes
- if isinstance(obj, pq.PGconn):
- bmsg = obj.error_message
-
- # strip severity and whitespaces
- if bmsg:
- bmsg = bmsg.splitlines()[0].split(b":", 1)[-1].strip()
+ if hasattr(obj, "error_field"):
+ obj = cast(PGresult, obj)
- elif isinstance(obj, pq.PGresult):
- bmsg = obj.error_field(pq.DiagnosticField.MESSAGE_PRIMARY) or b""
+ bmsg = obj.error_field(DiagnosticField.MESSAGE_PRIMARY) or b""
if not bmsg:
bmsg = obj.error_message
if bmsg:
bmsg = bmsg.splitlines()[0].split(b":", 1)[-1].strip()
+ elif hasattr(obj, "error_message"):
+ # obj is a PGconn
+ bmsg = obj.error_message
+
+ # strip severity and whitespaces
+ if bmsg:
+ bmsg = bmsg.splitlines()[0].split(b":", 1)[-1].strip()
+
else:
raise TypeError(
f"PGconn or PGresult expected, got {type(obj).__name__}"
DiagnosticField,
Format,
)
-from .misc import error_message, ConninfoOption
+from .misc import error_message, ConninfoOption, PQerror
from . import _pq_ctypes as impl
-from ..errors import OperationalError
if TYPE_CHECKING:
from psycopg3 import pq # noqa
return impl.PQlibVersion()
-class PQerror(OperationalError):
- pass
-
-
class PGconn:
__slots__ = ("pgconn_ptr",)
raise TypeError(f"bytes expected, got {type(command)} instead")
self._ensure_pgconn()
if not impl.PQsendQuery(self.pgconn_ptr, command):
- raise PQerror(
- "sending query failed:"
- f" {error_message(t_cast('pq.PGconn', self))}"
- )
+ raise PQerror(f"sending query failed: {error_message(self)}")
def exec_params(
self,
self._ensure_pgconn()
if not impl.PQsendQueryParams(*args):
raise PQerror(
- "sending query and params failed:"
- f" {error_message(t_cast('pq.PGconn', self))}"
+ f"sending query and params failed: {error_message(self)}"
)
def send_prepare(
self.pgconn_ptr, name, command, nparams, atypes
):
raise PQerror(
- "sending query and params failed:"
- f" {error_message(t_cast('pq.PGconn', self))}"
+ f"sending query and params failed: {error_message(self)}"
)
def send_query_prepared(
self._ensure_pgconn()
if not impl.PQsendQueryPrepared(*args):
raise PQerror(
- "sending prepared query failed:"
- f" {error_message(t_cast('pq.PGconn', self))}"
+ f"sending prepared query failed: {error_message(self)}"
)
def _query_params_args(
def consume_input(self) -> None:
if 1 != impl.PQconsumeInput(self.pgconn_ptr):
- raise PQerror(
- "consuming input failed:"
- f" {error_message(t_cast('pq.PGconn', self))}"
- )
+ raise PQerror(f"consuming input failed: {error_message(self)}")
def is_busy(self) -> int:
return impl.PQisBusy(self.pgconn_ptr)
@nonblocking.setter
def nonblocking(self, arg: int) -> None:
if 0 > impl.PQsetnonblocking(self.pgconn_ptr, arg):
- raise PQerror(
- f"setting nonblocking failed:"
- f" {error_message(t_cast('pq.PGconn', self))}"
- )
+ raise PQerror(f"setting nonblocking failed: {error_message(self)}")
def flush(self) -> int:
rv: int = impl.PQflush(self.pgconn_ptr)
if rv < 0:
- raise PQerror(
- f"flushing failed:{error_message(t_cast('pq.PGconn', self))}"
- )
+ raise PQerror(f"flushing failed: {error_message(self)}")
return rv
def make_empty_result(self, exec_status: ExecStatus) -> "PGresult":
from psycopg3.pq cimport libpq as impl
from psycopg3.errors import OperationalError
-from .misc import error_message, ConninfoOption
+from .misc import error_message, ConninfoOption, PQerror
from .enums import (
ConnStatus,
PollingStatus,
__impl__ = 'c'
-class PQerror(OperationalError):
- pass
-
-
def version():
return impl.PQlibVersion()
def host(self) -> bytes:
return self._call_bytes(impl.PQhost)
- # @property
- # def hostaddr(self) -> bytes:
- # return self._call_bytes(impl.PQhostaddr)
+ @property
+ def hostaddr(self) -> bytes:
+ return b'TODO'
@property
def port(self) -> bytes:
--- /dev/null
+from typing import Any, List, Optional, Sequence, TYPE_CHECKING
+from typing_extensions import Protocol
+
+from .enums import (
+ ConnStatus,
+ PollingStatus,
+ ExecStatus,
+ TransactionStatus,
+ Ping,
+ DiagnosticField,
+ Format,
+)
+
+if TYPE_CHECKING:
+ from .misc import ConninfoOption # noqa
+
+
+class PGconn(Protocol):
+ @classmethod
+ def connect(cls, conninfo: bytes) -> "PGconn":
+ ...
+
+ @classmethod
+ def connect_start(cls, conninfo: bytes) -> "PGconn":
+ ...
+
+ def connect_poll(self) -> PollingStatus:
+ ...
+
+ def finish(self) -> None:
+ ...
+
+ @property
+ def info(self) -> List["ConninfoOption"]:
+ ...
+
+ def reset(self) -> None:
+ ...
+
+ def reset_start(self) -> None:
+ ...
+
+ def reset_poll(self) -> PollingStatus:
+ ...
+
+ @classmethod
+ def ping(self, conninfo: bytes) -> Ping:
+ ...
+
+ @property
+ def db(self) -> bytes:
+ ...
+
+ @property
+ def user(self) -> bytes:
+ ...
+
+ @property
+ def password(self) -> bytes:
+ ...
+
+ @property
+ def host(self) -> bytes:
+ ...
+
+ @property
+ def hostaddr(self) -> bytes:
+ ...
+
+ @property
+ def port(self) -> bytes:
+ ...
+
+ @property
+ def tty(self) -> bytes:
+ ...
+
+ @property
+ def options(self) -> bytes:
+ ...
+
+ @property
+ def status(self) -> ConnStatus:
+ ...
+
+ @property
+ def transaction_status(self) -> TransactionStatus:
+ ...
+
+ def parameter_status(self, name: bytes) -> Optional[bytes]:
+ ...
+
+ @property
+ def error_message(self) -> bytes:
+ ...
+
+ @property
+ def protocol_version(self) -> int:
+ ...
+
+ @property
+ def server_version(self) -> int:
+ ...
+
+ @property
+ def socket(self) -> int:
+ ...
+
+ @property
+ def backend_pid(self) -> int:
+ ...
+
+ @property
+ def needs_password(self) -> bool:
+ ...
+
+ @property
+ def used_password(self) -> bool:
+ ...
+
+ @property
+ def ssl_in_use(self) -> bool:
+ ...
+
+ def exec_(self, command: bytes) -> "PGresult":
+ ...
+
+ def send_query(self, command: bytes) -> None:
+ ...
+
+ def exec_params(
+ self,
+ command: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_types: Optional[Sequence[int]] = None,
+ param_formats: Optional[Sequence[Format]] = None,
+ result_format: Format = Format.TEXT,
+ ) -> "PGresult":
+ ...
+
+ def send_query_params(
+ self,
+ command: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_types: Optional[Sequence[int]] = None,
+ param_formats: Optional[Sequence[Format]] = None,
+ result_format: Format = Format.TEXT,
+ ) -> None:
+ ...
+
+ def send_prepare(
+ self,
+ name: bytes,
+ command: bytes,
+ param_types: Optional[Sequence[int]] = None,
+ ) -> None:
+ ...
+
+ def send_query_prepared(
+ self,
+ name: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_formats: Optional[Sequence[Format]] = None,
+ result_format: Format = Format.TEXT,
+ ) -> None:
+ ...
+
+ def prepare(
+ self,
+ name: bytes,
+ command: bytes,
+ param_types: Optional[Sequence[int]] = None,
+ ) -> "PGresult":
+ ...
+
+ def exec_prepared(
+ self,
+ name: bytes,
+ param_values: Optional[Sequence[bytes]],
+ param_formats: Optional[Sequence[int]] = None,
+ result_format: int = 0,
+ ) -> "PGresult":
+ ...
+
+ def describe_prepared(self, name: bytes) -> "PGresult":
+ ...
+
+ def describe_portal(self, name: bytes) -> "PGresult":
+ ...
+
+ def get_result(self) -> Optional["PGresult"]:
+ ...
+
+ def consume_input(self) -> None:
+ ...
+
+ def is_busy(self) -> int:
+ ...
+
+ @property
+ def nonblocking(self) -> int:
+ ...
+
+ @nonblocking.setter
+ def nonblocking(self, arg: int) -> None:
+ ...
+
+ def flush(self) -> int:
+ ...
+
+ def make_empty_result(self, exec_status: ExecStatus) -> "PGresult":
+ ...
+
+
+class PGresult(Protocol):
+ def clear(self) -> None:
+ ...
+
+ @property
+ def status(self) -> ExecStatus:
+ ...
+
+ @property
+ def error_message(self) -> bytes:
+ ...
+
+ def error_field(self, fieldcode: DiagnosticField) -> Optional[bytes]:
+ ...
+
+ @property
+ def ntuples(self) -> int:
+ ...
+
+ @property
+ def nfields(self) -> int:
+ ...
+
+ def fname(self, column_number: int) -> Optional[bytes]:
+ ...
+
+ def ftable(self, column_number: int) -> int:
+ ...
+
+ def ftablecol(self, column_number: int) -> int:
+ ...
+
+ def fformat(self, column_number: int) -> Format:
+ ...
+
+ def ftype(self, column_number: int) -> int:
+ ...
+
+ def fmod(self, column_number: int) -> int:
+ ...
+
+ def fsize(self, column_number: int) -> int:
+ ...
+
+ @property
+ def binary_tuples(self) -> Format:
+ ...
+
+ def get_value(
+ self, row_number: int, column_number: int
+ ) -> Optional[bytes]:
+ ...
+
+ @property
+ def nparams(self) -> int:
+ ...
+
+ def param_type(self, param_number: int) -> int:
+ ...
+
+ @property
+ def command_status(self) -> Optional[bytes]:
+ ...
+
+ @property
+ def command_tuples(self) -> Optional[int]:
+ ...
+
+ @property
+ def oid_value(self) -> int:
+ ...
+
+
+class Conninfo(Protocol):
+ @classmethod
+ def get_defaults(cls) -> List["ConninfoOption"]:
+ ...
+
+ @classmethod
+ def parse(cls, conninfo: bytes) -> List["ConninfoOption"]:
+ ...
+
+ @classmethod
+ def _options_from_array(
+ cls, opts: Sequence[Any]
+ ) -> List["ConninfoOption"]:
+ ...
+
+
+class Escaping(Protocol):
+ def __init__(self, conn: Optional[PGconn] = None):
+ ...
+
+ def escape_bytea(self, data: bytes) -> bytes:
+ ...
+
+ def unescape_bytea(self, data: bytes) -> bytes:
+ ...
packages=find_packages(exclude=["tests"]),
classifiers=[x for x in classifiers.split("\n") if x],
setup_requires=["Cython"],
- install_requires=["Cython"],
+ install_requires=["Cython", "typing_extensions"],
zip_safe=False,
version=version,
project_urls={