__module__ = "psycopg.types.range"
- def __init__(self, name: str, oid: int, array_oid: int, subtype_oid: int):
- super().__init__(name, oid, array_oid)
+ def __init__(
+ self,
+ name: str,
+ oid: int,
+ array_oid: int,
+ *,
+ alt_name: str = "",
+ subtype_oid: int,
+ ):
+ super().__init__(name, oid, array_oid, alt_name=alt_name)
self.subtype_oid = subtype_oid
@classmethod
) -> str:
return """\
SELECT t.typname AS name, t.oid AS oid, t.typarray AS array_oid,
+ t.oid::regtype::text AS alt_name,
r.rngsubtype AS subtype_oid
FROM pg_type t
JOIN pg_range r ON t.oid = r.rngtypid
name: str,
oid: int,
array_oid: int,
+ *,
+ alt_name: str = "",
range_oid: int,
subtype_oid: int,
):
- super().__init__(name, oid, array_oid)
+ super().__init__(name, oid, array_oid, alt_name=alt_name)
self.range_oid = range_oid
self.subtype_oid = subtype_oid
)
return """\
SELECT t.typname AS name, t.oid AS oid, t.typarray AS array_oid,
+ t.oid::regtype::text AS alt_name,
r.rngtypid AS range_oid, r.rngsubtype AS subtype_oid
FROM pg_type t
JOIN pg_range r ON t.oid = r.rngmultitypid
name: str,
oid: int,
array_oid: int,
+ *,
+ alt_name: str = "",
field_names: Sequence[str],
field_types: Sequence[int],
):
- super().__init__(name, oid, array_oid)
+ super().__init__(name, oid, array_oid, alt_name=alt_name)
self.field_names = field_names
self.field_types = field_types
# Will be set by register() if the `factory` is a type
return """\
SELECT
t.typname AS name, t.oid AS oid, t.typarray AS array_oid,
+ t.oid::regtype::text AS alt_name,
coalesce(a.fnames, '{}') AS field_names,
coalesce(a.ftypes, '{}') AS field_types
FROM pg_type t
import pytest
-from psycopg import pq, postgres
-from psycopg.sql import Identifier
+from psycopg import pq, postgres, sql
from psycopg.adapt import PyFormat
from psycopg.postgres import types as builtins
from psycopg.types.range import Range
[("foo", "text"), ("bar", "int8"), ("qux", "bool")],
),
(
- Identifier("testcomp"),
+ sql.Identifier("testcomp"),
[("foo", "text"), ("bar", "int8"), ("baz", "float8")],
),
(
- Identifier("testschema", "testcomp"),
+ sql.Identifier("testschema", "testcomp"),
[("foo", "text"), ("bar", "int8"), ("qux", "bool")],
),
]
conn.execute("insert into meh values (%s)", [obj])
got = conn.execute("select wat from meh").fetchone()[0]
assert obj == got
+
+
+@pytest.mark.parametrize("name", ["a-b", f"{eur}", "order"])
+def test_literal_invalid_name(conn, name):
+ conn.execute("set client_encoding to utf8")
+ conn.execute(f'create type "{name}" as (foo text)')
+ info = CompositeInfo.fetch(conn, f'"{name}"')
+ register_composite(info, conn)
+ obj = info.python_type("hello")
+ assert sql.Literal(obj).as_string(conn) == f"'(hello)'::\"{name}\""
+ cur = conn.execute(sql.SQL("select {}").format(obj))
+ got = cur.fetchone()[0]
+ assert got == obj
+ assert type(got) is type(obj)
import pytest
-from psycopg import pq
+from psycopg import pq, sql
from psycopg import errors as e
-from psycopg.sql import Identifier
from psycopg.adapt import PyFormat
from psycopg.types.range import Range
from psycopg.types import multirange
from psycopg.types.multirange import Multirange, MultirangeInfo
from psycopg.types.multirange import register_multirange
-from .test_range import create_test_range
+from .test_range import create_test_range, eur
pytestmark = pytest.mark.pg(">= 14")
fetch_cases = [
("testmultirange", "text"),
("testschema.testmultirange", "float8"),
- (Identifier("testmultirange"), "text"),
- (Identifier("testschema", "testmultirange"), "float8"),
+ (sql.Identifier("testmultirange"), "text"),
+ (sql.Identifier("testschema", "testmultirange"), "float8"),
]
(got,) = cur.execute("select '{}'::testmultirange").fetchone()
assert isinstance(got, Multirange)
assert not got
+
+
+@pytest.mark.parametrize("name", ["a-b", f"{eur}"])
+def test_literal_invalid_name(conn, name):
+ conn.execute("set client_encoding to utf8")
+ conn.execute(f'create type "{name}" as range (subtype = text)')
+ info = MultirangeInfo.fetch(conn, f'"{name}_multirange"')
+ register_multirange(info, conn)
+ obj = Multirange([Range("a", "z", "[]")])
+ assert sql.Literal(obj).as_string(conn) == f"'{{[a,z]}}'::\"{name}_multirange\""
+ cur = conn.execute(sql.SQL("select {}").format(obj))
+ assert cur.fetchone()[0] == obj
import pytest
-from psycopg import pq
+from psycopg import pq, sql
from psycopg import errors as e
-from psycopg.sql import Identifier
from psycopg.adapt import PyFormat
from psycopg.types import range as range_module
from psycopg.types.range import Range, RangeInfo, register_range
+eur = "\u20ac"
type2sub = {
"int4range": "int4",
fetch_cases = [
("testrange", "text"),
("testschema.testrange", "float8"),
- (Identifier("testrange"), "text"),
- (Identifier("testschema", "testrange"), "float8"),
+ (sql.Identifier("testrange"), "text"),
+ (sql.Identifier("testschema", "testrange"), "float8"),
]
def test_no_info_error(conn):
with pytest.raises(TypeError, match="range"):
register_range(None, conn) # type: ignore[arg-type]
+
+
+@pytest.mark.parametrize("name", ["a-b", f"{eur}", "order"])
+def test_literal_invalid_name(conn, name):
+ conn.execute("set client_encoding to utf8")
+ conn.execute(f'create type "{name}" as range (subtype = text)')
+ info = RangeInfo.fetch(conn, f'"{name}"')
+ register_range(info, conn)
+ obj = Range("a", "z", "[]")
+ assert sql.Literal(obj).as_string(conn) == f"'[a,z]'::\"{name}\""
+ cur = conn.execute(sql.SQL("select {}").format(obj))
+ assert cur.fetchone()[0] == obj