name: Union[str, "Identifier"],
) -> Any:
"""Query a system catalog to read information about a type."""
- from .connection_async import AsyncConnection
-
- if isinstance(conn, AsyncConnection):
- return cls._fetch_async(conn, name)
-
from .sql import Composable
+ from .connection_async import AsyncConnection
if isinstance(name, Composable):
name = name.as_string(conn)
+ if isinstance(conn, AsyncConnection):
+ return cls._fetch_async(conn, name)
+
# This might result in a nested transaction. What we want is to leave
# the function with the connection in the state we found (either idle
# or intrans)
@classmethod
async def _fetch_async(
- cls: Type[T],
- conn: "AsyncConnection[Any]",
- name: Union[str, "Identifier"],
+ cls: Type[T], conn: "AsyncConnection[Any]", name: str
) -> Optional[T]:
"""
Query a system catalog to read information about a type.
Similar to `fetch()` but can use an asynchronous connection.
"""
- from .sql import Composable
-
- if isinstance(name, Composable):
- name = name.as_string(conn)
-
try:
async with conn.transaction():
async with conn.cursor(binary=True, row_factory=dict_row) as cur: