]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor: drop internal use of Union[] type definition in TypeInfo.fetch() 475/head
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 6 Jan 2023 17:29:59 +0000 (17:29 +0000)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 6 Jan 2023 19:33:45 +0000 (19:33 +0000)
psycopg/psycopg/_typeinfo.py

index bb8bbc55d8fc4c3e8865dfd846c046a85e5eff6c..958799ab4d71cd3ce73900120ee7601b10a59b72 100644 (file)
@@ -62,28 +62,33 @@ class TypeInfo:
     @overload
     @classmethod
     async def fetch(
-        cls: Type[T],
-        conn: "AsyncConnection[Any]",
-        name: Union[str, "Identifier"],
+        cls: Type[T], conn: "AsyncConnection[Any]", name: Union[str, "Identifier"]
     ) -> Optional[T]:
         ...
 
     @classmethod
     def fetch(
-        cls: Type[T],
-        conn: Union["Connection[Any]", "AsyncConnection[Any]"],
-        name: Union[str, "Identifier"],
+        cls: Type[T], conn: "BaseConnection[Any]", name: Union[str, "Identifier"]
     ) -> Any:
         """Query a system catalog to read information about a type."""
         from .sql import Composable
+        from .connection import Connection
         from .connection_async import AsyncConnection
 
         if isinstance(name, Composable):
             name = name.as_string(conn)
 
-        if isinstance(conn, AsyncConnection):
+        if isinstance(conn, Connection):
+            return cls._fetch(conn, name)
+        elif isinstance(conn, AsyncConnection):
             return cls._fetch_async(conn, name)
+        else:
+            raise TypeError(
+                f"expected Connection or AsyncConnection, got {type(conn).__name__}"
+            )
 
+    @classmethod
+    def _fetch(cls: Type[T], conn: "Connection[Any]", name: str) -> Optional[T]:
         # This might result in a nested transaction. What we want is to leave
         # the function with the connection in the state we found (either idle
         # or intrans)
@@ -101,11 +106,6 @@ class TypeInfo:
     async def _fetch_async(
         cls: Type[T], conn: "AsyncConnection[Any]", name: str
     ) -> Optional[T]:
-        """
-        Query a system catalog to read information about a type.
-
-        Similar to `fetch()` but can use an asynchronous connection.
-        """
         try:
             async with conn.transaction():
                 async with conn.cursor(binary=True, row_factory=dict_row) as cur: