# Copyright (C) 2020 The Psycopg Team
-from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union
+from dataclasses import dataclass, field, fields
+from typing import Any, Callable, Dict, List, NoReturn, Optional, Sequence, Tuple, Type
+from typing import Union, TYPE_CHECKING
from typing_extensions import TypeAlias
from asyncio import CancelledError
from .pq.abc import PGconn, PGresult
-from .pq._enums import DiagnosticField
+from .pq._enums import ConnStatus, DiagnosticField, PipelineStatus, TransactionStatus
from ._compat import TypeGuard
+if TYPE_CHECKING:
+ from .pq.misc import PGnotify, ConninfoOption
+
ErrorInfo: TypeAlias = Union[None, PGresult, Dict[int, Optional[bytes]]]
_sqlcodes: Dict[str, "Type[Error]"] = {}
+@dataclass
+class FinishedPGconn:
+ """Finished libpq connection.
+
+ Attributes are set from a real `~pscopg.pq.PGconn` but any operations will
+ raise an `~psycopg.OperationalError`.
+ """
+
+ info: List["ConninfoOption"] = field(default_factory=list)
+
+ db: bytes = b""
+ user: bytes = b""
+ password: bytes = b""
+ host: bytes = b""
+ hostaddr: bytes = b""
+ port: bytes = b""
+ tty: bytes = b""
+ options: bytes = b""
+ status: int = ConnStatus.BAD.value
+ transaction_status: int = TransactionStatus.UNKNOWN.value
+ pipeline_status: int = PipelineStatus.OFF.value
+
+ error_message: bytes = b""
+ server_version: int = 0
+
+ backend_pid: int = 0
+ needs_password: bool = False
+ used_password: bool = False
+ ssl_in_use: bool = False
+
+ nonblocking: int = 0
+
+ notice_handler: Optional[Callable[["PGresult"], None]] = None
+ notify_handler: Optional[Callable[["PGnotify"], None]] = None
+
+ @staticmethod
+ def _raise() -> NoReturn:
+ raise OperationalError("the connection is closed")
+
+ @classmethod
+ def connect(cls, *args: Any) -> NoReturn:
+ raise TypeError(f"{cls} is unusable")
+
+ @classmethod
+ def connect_start(cls, *args: Any) -> NoReturn:
+ raise TypeError(f"{cls} is unusable")
+
+ def connect_poll(self) -> NoReturn:
+ self._raise()
+
+ def finish(self) -> None:
+ pass
+
+ def reset(self) -> NoReturn:
+ self._raise()
+
+ def reset_start(self) -> NoReturn:
+ self._raise()
+
+ def reset_poll(self) -> NoReturn:
+ self._raise()
+
+ @classmethod
+ def ping(cls, *args: Any) -> NoReturn:
+ raise TypeError(f"{cls} is unusable")
+
+ def parameter_status(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ @property
+ def socket(self) -> NoReturn:
+ self._raise()
+
+ def exec_(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def send_query(self, *args: Any) -> None:
+ self._raise()
+
+ def exec_params(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def send_query_params(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def send_prepare(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def send_query_prepared(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def prepare(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def exec_prepared(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def describe_prepared(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def send_describe_prepared(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def describe_portal(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def send_describe_portal(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def get_result(self) -> NoReturn:
+ self._raise()
+
+ def consume_input(self) -> NoReturn:
+ self._raise()
+
+ def is_busy(self) -> NoReturn:
+ self._raise()
+
+ def flush(self) -> NoReturn:
+ self._raise()
+
+ def set_single_row_mode(self) -> NoReturn:
+ self._raise()
+
+ def get_cancel(self) -> NoReturn:
+ self._raise()
+
+ def notifies(self) -> NoReturn:
+ self._raise()
+
+ def put_copy_data(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def put_copy_end(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def get_copy_data(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def trace(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def set_trace_flags(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def untrace(self) -> NoReturn:
+ self._raise()
+
+ def encrypt_password(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def make_empty_result(self, *args: Any) -> NoReturn:
+ self._raise()
+
+ def enter_pipeline_mode(self) -> NoReturn:
+ self._raise()
+
+ def exit_pipeline_mode(self) -> NoReturn:
+ self._raise()
+
+ def pipeline_sync(self) -> NoReturn:
+ self._raise()
+
+ def send_flush_request(self) -> NoReturn:
+ self._raise()
+
+
+def finish_pgconn(pgconn: PGconn) -> PGconn:
+ args = {}
+ for f in fields(FinishedPGconn):
+ try:
+ args[f.name] = getattr(pgconn, f.name)
+ except Exception:
+ pass
+ pgconn.finish()
+ return FinishedPGconn(**args)
+
+
class Warning(Exception):
"""
Exception raised for important warnings.