class TypeInfo:
"""
Hold information about a PostgreSQL base type.
-
- The class allows to:
-
- - read information about a range type using `fetch()` and `fetch_async()`
- - configure a composite type adaptation using `register()`
"""
__module__ = "psycopg.types"
if isinstance(name, Composable):
name = name.as_string(conn)
+
cur = conn.cursor(binary=True, row_factory=dict_row)
- cur.execute(cls._info_query, {"name": name})
+ # This might result in a nested transaction. What we want is to leave
+ # the function with the connection in the state we found (either idle
+ # or intrans)
+ try:
+ with conn.transaction():
+ cur.execute(cls._info_query, {"name": name})
+ except e.UndefinedObject:
+ return None
+
recs = cur.fetchall()
return cls._fetch(name, recs)
name = name.as_string(conn)
cur = conn.cursor(binary=True, row_factory=dict_row)
- await cur.execute(cls._info_query, {"name": name})
+ try:
+ async with conn.transaction():
+ await cur.execute(cls._info_query, {"name": name})
+ except e.UndefinedObject:
+ return None
+
recs = await cur.fetchall()
return cls._fetch(name, recs)
--- /dev/null
+import pytest
+
+import psycopg
+from psycopg import sql
+from psycopg.pq import TransactionStatus
+from psycopg.types import TypeInfo
+
+
+@pytest.mark.parametrize("name", ["text", sql.Identifier("text")])
+@pytest.mark.parametrize("status", ["IDLE", "INTRANS"])
+def test_fetch(conn, name, status):
+ status = getattr(TransactionStatus, status)
+ if status == TransactionStatus.INTRANS:
+ conn.execute("select 1")
+
+ assert conn.info.transaction_status == status
+ info = TypeInfo.fetch(conn, name)
+ assert conn.info.transaction_status == status
+
+ assert info.name == "text"
+ # TODO: add the schema?
+ # assert info.schema == "pg_catalog"
+
+ assert info.oid == psycopg.adapters.types["text"].oid
+ assert info.array_oid == psycopg.adapters.types["text"].array_oid
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("name", ["text", sql.Identifier("text")])
+@pytest.mark.parametrize("status", ["IDLE", "INTRANS"])
+async def test_fetch_async(aconn, name, status):
+ status = getattr(TransactionStatus, status)
+ if status == TransactionStatus.INTRANS:
+ await aconn.execute("select 1")
+
+ assert aconn.info.transaction_status == status
+ info = await TypeInfo.fetch_async(aconn, name)
+ assert aconn.info.transaction_status == status
+
+ assert info.name == "text"
+ # assert info.schema == "pg_catalog"
+ assert info.oid == psycopg.adapters.types["text"].oid
+ assert info.array_oid == psycopg.adapters.types["text"].array_oid
+
+
+@pytest.mark.parametrize("name", ["nosuch", sql.Identifier("nosuch")])
+@pytest.mark.parametrize("status", ["IDLE", "INTRANS"])
+def test_fetch_not_found(conn, name, status):
+ status = getattr(TransactionStatus, status)
+ if status == TransactionStatus.INTRANS:
+ conn.execute("select 1")
+
+ assert conn.info.transaction_status == status
+ info = TypeInfo.fetch(conn, name)
+ assert conn.info.transaction_status == status
+ assert info is None
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("name", ["nosuch", sql.Identifier("nosuch")])
+@pytest.mark.parametrize("status", ["IDLE", "INTRANS"])
+async def test_fetch_not_found_async(aconn, name, status):
+ status = getattr(TransactionStatus, status)
+ if status == TransactionStatus.INTRANS:
+ await aconn.execute("select 1")
+
+ assert aconn.info.transaction_status == status
+ info = await TypeInfo.fetch_async(aconn, name)
+ assert aconn.info.transaction_status == status
+
+ assert info is None
+
+
+@pytest.mark.parametrize(
+ "name", ["testschema.testtype", sql.Identifier("testschema", "testtype")]
+)
+def test_fetch_by_schema_qualified_string(conn, name):
+ conn.execute("create schema if not exists testschema")
+ conn.execute("create type testschema.testtype as (foo text)")
+
+ info = TypeInfo.fetch(conn, name)
+ assert info.name == "testtype"
+ # assert info.schema == "testschema"
+ cur = conn.execute(
+ """
+ select oid, typarray from pg_type
+ where oid = 'testschema.testtype'::regtype
+ """
+ )
+ assert cur.fetchone() == (info.oid, info.array_oid)
+
+
+@pytest.mark.parametrize(
+ "name",
+ [
+ "text",
+ # TODO: support these?
+ # "pg_catalog.text",
+ # sql.Identifier("text"),
+ # sql.Identifier("pg_catalog", "text"),
+ ],
+)
+def test_registry_by_builtin_name(conn, name):
+ info = psycopg.adapters.types[name]
+ assert info.name == "text"
+ assert info.oid == 25