self._fmod = res.fmod(index)
self._fsize = res.fsize(index)
- _attrs = tuple(
- attrgetter(attr)
- for attr in """
+ _attrs = tuple(attrgetter(attr) for attr in """
name type_code display_size internal_size precision scale null_ok
- """.split()
- )
+ """.split())
def __repr__(self) -> str:
return (
else:
f = _query2pg_nocache
- (self.query, self._want_formats, self._order, self._parts) = f(
+ self.query, self._want_formats, self._order, self._parts = f(
query, self._tx.encoding
)
else:
else:
f = _query2pg_client_nocache
- (self.template, self._order, self._parts) = f(query, self._tx.encoding)
+ self.template, self._order, self._parts = f(query, self._tx.encoding)
else:
self.query = query
self._order = None
_query2pg_client = lru_cache(_query2pg_client_nocache)
-_re_placeholder = re.compile(
- rb"""(?x)
+_re_placeholder = re.compile(rb"""(?x)
% # a literal %
(?:
(?:
|
(?:.) # or any char, really
)
- """
-)
+ """)
def _split_query(
@classmethod
def _get_info_query(cls, conn: BaseConnection[Any]) -> QueryNoTemplate:
- return sql.SQL(
- """\
+ return sql.SQL("""\
SELECT
typname AS name, oid, typarray AS array_oid,
oid::regtype::text AS regtype, typdelim AS delimiter
FROM pg_type t
WHERE t.oid = {regtype}
ORDER BY t.oid
-"""
- ).format(regtype=cls._to_regtype(conn))
+""").format(regtype=cls._to_regtype(conn))
@classmethod
def _has_to_regtype_function(cls, conn: BaseConnection[Any]) -> bool:
raise ImportError(f"requested psycopg implementation '{impl}' unknown")
else:
sattempts = "\n".join(f"- {attempt}" for attempt in attempts)
- raise ImportError(
- f"""\
+ raise ImportError(f"""\
no pq wrapper available.
Attempts made:
-{sattempts}"""
- )
+{sattempts}""")
import_from_libpq()
they are empty strings, contain curly braces, delimiter characters,
double quotes, backslashes, or white space, or match the word NULL.
"""
- return re.compile(
- rb"""(?xi)
+ return re.compile(rb"""(?xi)
^$ # the empty string
| ["{}%s\\\s] # or a char to escape
| ^null$ # or the word NULL
- """
- % delimiter
- )
+ """ % delimiter)
class ListBinaryDumper(BaseListDumper):
"""
Return a regexp to tokenize an array representation into item and brackets
"""
- return re.compile(
- rb"""(?xi)
+ return re.compile(rb"""(?xi)
( [{}] # open or closed bracket
| " (?: [^"\\] | \\. )* " # or a quoted string
| [^"{}%s\\]+ # or an unquoted non-empty string
) ,?
- """
- % delimiter
- )
+ """ % delimiter)
def _load_binary(data: Buffer, tx: Transformer) -> list[Any]:
@classmethod
def _get_info_query(cls, conn: BaseConnection[Any]) -> abc.QueryNoTemplate:
- return sql.SQL(
- """\
+ return sql.SQL("""\
SELECT
t.typname AS name, t.oid AS oid, t.typarray AS array_oid,
t.oid::regtype::text AS regtype,
GROUP BY attrelid
) a ON a.attrelid = t.typrelid
WHERE t.oid = {regtype}
-"""
- ).format(regtype=cls._to_regtype(conn))
+""").format(regtype=cls._to_regtype(conn))
class TupleDumper(RecursiveDumper):
return record
-_re_tokenize = re.compile(
- rb"""(?x)
+_re_tokenize = re.compile(rb"""(?x)
(,) # an empty token, representing NULL
| " ((?: [^"] | "")*) " ,? # or a quoted string
| ([^",)]+) ,? # or an unquoted string
- """
-)
+ """)
_re_undouble = re.compile(rb'(["\\])\1')
class TimetzLoader(Loader):
- _re_format = re.compile(
- rb"""(?ix)
+ _re_format = re.compile(rb"""(?ix)
^
(\d+) : (\d+) : (\d+) (?: \. (\d+) )? # Time and micros
([-+]) (\d+) (?: : (\d+) )? (?: : (\d+) )? # Timezone
$
- """
- )
+ """)
def load(self, data: Buffer) -> time:
if not (m := self._re_format.match(data)):
class TimestampLoader(Loader):
- _re_format = re.compile(
- rb"""(?ix)
+ _re_format = re.compile(rb"""(?ix)
^
(\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Date
(?: T | [^a-z0-9] ) # Separator, including T
(\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Time
(?: \.(\d+) )? # Micros
$
- """
- )
- _re_format_pg = re.compile(
- rb"""(?ix)
+ """)
+ _re_format_pg = re.compile(rb"""(?ix)
^
[a-z]+ [^a-z0-9] # DoW, separator
(\d+|[a-z]+) [^a-z0-9] # Month or day
(?: \.(\d+) )? # Micros
[^a-z0-9] (\d+) # Year
$
- """
- )
+ """)
_ORDER_YMD = 0
_ORDER_DMY = 1
class TimestamptzLoader(Loader):
- _re_format = re.compile(
- rb"""(?ix)
+ _re_format = re.compile(rb"""(?ix)
^
(\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Date
(?: T | [^a-z0-9] ) # Separator, including T
(?: \.(\d+) )? # Micros
([-+]) (\d+) (?: : (\d+) )? (?: : (\d+) )? # Timezone
$
- """
- )
+ """)
def __init__(self, oid: int, context: AdaptContext | None = None):
super().__init__(oid, context)
@classmethod
def _get_info_query(cls, conn: BaseConnection[Any]) -> QueryNoTemplate:
- return sql.SQL(
- """\
+ return sql.SQL("""\
SELECT name, oid, array_oid, array_agg(label) AS labels
FROM (
SELECT
ORDER BY e.enumsortorder
) x
GROUP BY name, oid, array_oid
-"""
- ).format(regtype=cls._to_regtype(conn))
+""").format(regtype=cls._to_regtype(conn))
class _BaseEnumLoader(Loader, Generic[E]):
raise e.NotSupportedError(
"multirange types are only available from PostgreSQL 14"
)
- return sql.SQL(
- """\
+ return sql.SQL("""\
SELECT t.typname AS name, t.oid AS oid, t.typarray AS array_oid,
t.oid::regtype::text AS regtype,
r.rngtypid AS range_oid, r.rngsubtype AS subtype_oid
FROM pg_type t
JOIN pg_range r ON t.oid = r.rngmultitypid
WHERE t.oid = {regtype}
-"""
- ).format(regtype=cls._to_regtype(conn))
+""").format(regtype=cls._to_regtype(conn))
def _added(self, registry: TypesRegistry) -> None:
# Map multiranges ranges and subtypes to info
@classmethod
def _get_info_query(cls, conn: BaseConnection[Any]) -> QueryNoTemplate:
- return sql.SQL(
- """\
+ return sql.SQL("""\
SELECT t.typname AS name, t.oid AS oid, t.typarray AS array_oid,
t.oid::regtype::text AS regtype,
r.rngsubtype AS subtype_oid
FROM pg_type t
JOIN pg_range r ON t.oid = r.rngtypid
WHERE t.oid = {regtype}
-"""
- ).format(regtype=cls._to_regtype(conn))
+""").format(regtype=cls._to_regtype(conn))
def _added(self, registry: TypesRegistry) -> None:
# Map ranges subtypes to info
]
dev = [
"ast-comments >= 1.1.2",
- "black >= 24.1.0",
+ "black >= 26.1.0",
"codespell >= 2.2",
"cython-lint >= 0.16",
"dnspython >= 2.1",
@pytest.fixture(scope="session")
def _execmany(svcconn):
- svcconn.execute(
- """
+ svcconn.execute("""
drop table if exists execmany;
create table execmany (id serial primary key, num integer, data text)
- """
- )
+ """)
@pytest.fixture(scope="function")
pytest-randomly == 3.5.0
# From the 'dev' extra
-black == 24.1.0
+black == 26.1.0
dnspython == 2.1.0
flake8 == 4.0.0
types-setuptools == 57.4.0
copy.write_row((i, None, chr(i)))
copy.write_row((ord(eur), None, eur))
- cur.execute(
- """
+ cur.execute("""
select col1 = ascii(data), col2 is null, length(data), count(*)
from copy_in group by 1, 2, 3
-"""
- )
+""")
data = cur.fetchall()
assert data == [(True, True, 1, 256)]
await copy.write_row((i, None, chr(i)))
await copy.write_row((ord(eur), None, eur))
- await cur.execute(
- """
+ await cur.execute("""
select col1 = ascii(data), col2 is null, length(data), count(*)
from copy_in group by 1, 2, 3
-"""
- )
+""")
data = await cur.fetchall()
assert data == [(True, True, 1, 256)]
def test_version(mypy):
- cp = mypy.run_on_source(
- """\
+ cp = mypy.run_on_source("""\
from psycopg_pool import __version__
assert __version__
-"""
- )
+""")
assert not cp.stdout
@pytest.mark.crdb_skip("server-side cursor")
def test_send_describe_portal(pgconn):
- res = pgconn.exec_(
- b"""
+ res = pgconn.exec_(b"""
begin;
declare cur cursor for select * from generate_series(1,10) foo;
- """
- )
+ """)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_describe_portal(b"cur")
@pytest.mark.libpq(">= 17")
@pytest.mark.crdb_skip("close portal")
def test_send_close_portal(pgconn):
- res = pgconn.exec_(
- b"""
+ res = pgconn.exec_(b"""
begin;
declare cur cursor for select * from generate_series(1,10) foo;
- """
- )
+ """)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_close_portal(b"cur")
for i in range(10):
data = []
for j in range(20):
- data.append(
- f"""\
+ data.append(f"""\
{i * 20 + j}\t{j}\t{'X' * (i * 20 + j)}
-"""
- )
+""")
rv = pgconn.put_copy_data("".join(data).encode("ascii"))
assert rv > 0
for i in range(10):
data = []
for j in range(20):
- data.append(
- f"""\
+ data.append(f"""\
{i * 20 + j}\thardly a number\tnope
-"""
- )
+""")
rv = pgconn.put_copy_data("".join(data).encode("ascii"))
assert rv > 0
for i in range(10):
data = []
for j in range(20):
- data.append(
- f"""\
+ data.append(f"""\
{i * 20 + j}\t{j}\t{'X' * (i * 20 + j)}
-"""
- )
+""")
rv = pgconn.put_copy_data("".join(data).encode("ascii"))
assert rv > 0
@pytest.mark.crdb_skip("close portal")
def test_describe_portal(pgconn):
- res = pgconn.exec_(
- b"""
+ res = pgconn.exec_(b"""
begin;
declare cur cursor for select * from generate_series(1,10) foo;
- """
- )
+ """)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
res = pgconn.describe_portal(b"cur")
@pytest.mark.crdb_skip("close portal")
@pytest.mark.libpq(">= 17")
def test_close_portal(pgconn):
- res = pgconn.exec_(
- b"""
+ res = pgconn.exec_(b"""
begin;
declare cur cursor for select * from generate_series(1,10) foo;
- """
- )
+ """)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
res = pgconn.close_portal(b"cur")
@pytest.mark.crdb("skip", reason="ftable")
def test_ftable_and_col(pgconn):
- res = pgconn.exec_(
- b"""
+ res = pgconn.exec_(b"""
drop table if exists t1, t2;
create table t1 as select 1 as f1;
create table t2 as select 2 as f2, 3 as f3;
- """
- )
+ """)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
res = pgconn.exec_(
#!/usr/bin/env python
"""Copy operation micro-benchmarks."""
+
from __future__ import annotations
import sys
fields = sql.SQL(", ").join(
[sql.SQL(f"f{i} text") for i in range(self.args.nfields)]
)
- stmt = sql.SQL(
- """\
+ stmt = sql.SQL("""\
create temp table testcopy (id serial primary key, {})
-"""
- ).format(fields)
+""").format(fields)
return stmt
def get_copy_stmt(self) -> Query:
fields = sql.SQL(", ").join(
[sql.Identifier(f"f{i}") for i in range(self.args.nfields)]
)
- stmt = sql.SQL(
- """\
+ stmt = sql.SQL("""\
copy testcopy ({}) from stdin
-"""
- ).format(fields)
+""").format(fields)
return stmt
def get_record(self) -> tuple[Any, ...]:
Welcome-To-The-Jungle.md
"""
+
# mypy: allow-untyped-defs
# mypy: allow-untyped-calls
def test_description_attribs(conn):
curs = conn.cursor()
- curs.execute(
- """select
+ curs.execute("""select
3.14::decimal(10,2) as pi,
'hello'::text as hi,
'2010-02-18'::date as now
- """
- )
+ """)
assert len(curs.description) == 3
for c in curs.description:
len(c) == 7 # DBAPI happy
- for i, a in enumerate(
- """
+ for i, a in enumerate("""
name type_code display_size internal_size precision scale null_ok
- """.split()
- ):
+ """.split()):
assert c[i] == getattr(c, a)
# Won't fill them up
def test_pickle(conn):
curs = conn.cursor()
- curs.execute(
- """select
+ curs.execute("""select
3.14::decimal(10,2) as pi,
'hello'::text as hi,
'2010-02-18'::date as now
- """
- )
+ """)
description = curs.description
pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL)
unpickled = pickle.loads(pickled)
@pytest.mark.crdb_skip("deferrable")
def test_commit_error(conn):
- conn.execute(
- """
+ conn.execute("""
drop table if exists selfref;
create table selfref (
x serial primary key,
y int references selfref (x) deferrable initially deferred)
- """
- )
+ """)
conn.commit()
conn.execute("insert into selfref (y) values (-1)")
@pytest.mark.crdb_skip("deferrable")
async def test_commit_error(aconn):
- await aconn.execute(
- """
+ await aconn.execute("""
drop table if exists selfref;
create table selfref (
x serial primary key,
y int references selfref (x) deferrable initially deferred)
- """
- )
+ """)
await aconn.commit()
await aconn.execute("insert into selfref (y) values (-1)")
@pytest.mark.parametrize("typetype", ["names", "oids"])
def test_read_rows(conn, format, typetype):
cur = conn.cursor()
- with cur.copy(
- """copy (
+ with cur.copy("""
+ copy (
select 10::int4, 'hello'::text, '{0.0,1.0}'::float8[]
- ) to stdout (format %s)"""
- % format.name
- ) as copy:
+ ) to stdout (format %s)""" % format.name) as copy:
copy.set_types(["int4", "text", "float8[]"])
row = copy.read_row()
assert copy.read_row() is None
copy.write_row((i, None, chr(i)))
copy.write_row((ord(eur), None, eur))
- cur.execute(
- """
+ cur.execute("""
select col1 = ascii(data), col2 is null, length(data), count(*)
from copy_in group by 1, 2, 3
-"""
- )
+""")
data = cur.fetchall()
assert data == [(True, True, 1, 256)]
@pytest.mark.parametrize("typetype", ["names", "oids"])
async def test_read_rows(aconn, format, typetype):
cur = aconn.cursor()
- async with cur.copy(
- """copy (
+ async with cur.copy("""
+ copy (
select 10::int4, 'hello'::text, '{0.0,1.0}'::float8[]
- ) to stdout (format %s)"""
- % format.name
- ) as copy:
+ ) to stdout (format %s)""" % format.name) as copy:
copy.set_types(["int4", "text", "float8[]"])
row = await copy.read_row()
assert (await copy.read_row()) is None
await copy.write_row((i, None, chr(i)))
await copy.write_row((ord(eur), None, eur))
- await cur.execute(
- """
+ await cur.execute("""
select col1 = ascii(data), col2 is null, length(data), count(*)
from copy_in group by 1, 2, 3
-"""
- )
+""")
data = await cur.fetchall()
assert data == [(True, True, 1, 256)]
def test_rownumber_mixed(conn):
cur = conn.cursor()
- cur.execute(
- """
+ cur.execute("""
select x from generate_series(1, 3) x;
set timezone to utc;
select x from generate_series(4, 6) x;
-"""
- )
+""")
assert cur.rownumber == 0
assert cur.fetchone() == (1,)
assert cur.rownumber == 1
# With no result
cur.adapters.register_loader("text", make_loader("1"))
- cur.execute(
- """
+ cur.execute("""
values ('foo'::text);
values ('bar'::text), ('baz');
values ('qux'::text);
- """
- )
+ """)
assert cur.fetchall() == [("foo1",)]
cur.nextset()
async def test_rownumber_mixed(aconn):
cur = aconn.cursor()
- await cur.execute(
- """
+ await cur.execute("""
select x from generate_series(1, 3) x;
set timezone to utc;
select x from generate_series(4, 6) x;
-"""
- )
+""")
assert cur.rownumber == 0
assert await cur.fetchone() == (1,)
assert cur.rownumber == 1
# With no result
cur.adapters.register_loader("text", make_loader("1"))
- await cur.execute(
- """
+ await cur.execute("""
values ('foo'::text);
values ('bar'::text), ('baz');
values ('qux'::text);
- """
- )
+ """)
assert (await cur.fetchall()) == [("foo1",)]
cur.nextset()
def test_diag_attr_values(conn):
if is_crdb(conn):
conn.execute("set experimental_enable_temp_tables = 'on'")
- conn.execute(
- """
+ conn.execute("""
create temp table test_exc (
data int constraint chk_eq1 check (data = 1)
- )"""
- )
+ )""")
with pytest.raises(e.Error) as exc:
conn.execute("insert into test_exc values(2)")
diag = exc.value.diag
conn.execute(f"set client_encoding to {enc}")
cur = conn.cursor()
with pytest.raises(e.DatabaseError) as excinfo:
- cur.execute(
- """
+ cur.execute("""
do $$begin
execute format('insert into "%s" values (1)', chr(8364));
end$$ language plpgsql;
- """
- )
+ """)
diag = excinfo.value.diag
assert diag.message_primary and f'"{eur}"' in diag.message_primary
@pytest.mark.crdb_skip("deferrable")
def test_diag_from_commit(conn):
cur = conn.cursor()
- cur.execute(
- """
+ cur.execute("""
create temp table test_deferred (
data int primary key,
ref int references test_deferred (data)
deferrable initially deferred)
- """
- )
+ """)
cur.execute("insert into test_deferred values (1,2)")
with pytest.raises(e.Error) as exc:
conn.commit()
@pytest.mark.crdb_skip("deferrable")
async def test_diag_from_commit_async(aconn):
cur = aconn.cursor()
- await cur.execute(
- """
+ await cur.execute("""
create temp table test_deferred (
data int primary key,
ref int references test_deferred (data)
deferrable initially deferred)
- """
- )
+ """)
await cur.execute("insert into test_deferred values (1,2)")
with pytest.raises(e.Error) as exc:
await aconn.commit()
e.lookup(code)
with pytest.raises(e.ProgrammingError) as excinfo:
- conn.execute(
- f"""
+ conn.execute(f"""
do $$begin
raise exception 'made up code' using errcode = '{code}';
end$$ language plpgsql
- """
- )
+ """)
exc = excinfo.value
assert exc.diag.sqlstate == code
assert exc.sqlstate == code
def test_version_static(mypy):
- cp = mypy.run_on_source(
- """\
+ cp = mypy.run_on_source("""\
from psycopg import __version__
assert __version__
-"""
- )
+""")
assert not cp.stdout
# can be psycopg_c, psycopg_binary
cpackage = _psycopg.__name__.split(".")[0]
- cp = mypy.run_on_source(
- f"""\
+ cp = mypy.run_on_source(f"""\
from {cpackage} import __version__
assert __version__
-"""
- )
+""")
assert not cp.stdout
@pytest.mark.crdb_skip("deferrable")
def test_error_on_commit(conn):
- conn.execute(
- """
+ conn.execute("""
drop table if exists selfref;
create table selfref (
x serial primary key,
y int references selfref (x) deferrable initially deferred)
- """
- )
+ """)
conn.commit()
with conn.pipeline():
def test_executemany_no_returning(conn):
conn.set_autocommit(True)
conn.execute("drop table if exists execmanypipelinenoreturning")
- conn.execute(
- """create unlogged table execmanypipelinenoreturning
- (id serial primary key, num integer)"""
- )
+ conn.execute("""
+ create unlogged table execmanypipelinenoreturning
+ (id serial primary key, num integer)""")
with conn.pipeline(), conn.cursor() as cur:
cur.executemany(
"insert into execmanypipelinenoreturning(num) values (%s)",
conn.execute("drop table if exists pipeline_concurrency")
conn.execute("drop table if exists accessed")
with conn.transaction():
- conn.execute(
- """create unlogged table pipeline_concurrency (
+ conn.execute("""
+ create unlogged table pipeline_concurrency (
id serial primary key,
- value integer)"""
- )
+ value integer)""")
conn.execute("create unlogged table accessed as (select now() as value)")
def update(value):
@pytest.mark.crdb_skip("deferrable")
async def test_error_on_commit(aconn):
- await aconn.execute(
- """
+ await aconn.execute("""
drop table if exists selfref;
create table selfref (
x serial primary key,
y int references selfref (x) deferrable initially deferred)
- """
- )
+ """)
await aconn.commit()
async with aconn.pipeline():
async def test_executemany_no_returning(aconn):
await aconn.set_autocommit(True)
await aconn.execute("drop table if exists execmanypipelinenoreturning")
- await aconn.execute(
- """create unlogged table execmanypipelinenoreturning
- (id serial primary key, num integer)"""
- )
+ await aconn.execute("""
+ create unlogged table execmanypipelinenoreturning
+ (id serial primary key, num integer)""")
async with aconn.pipeline(), aconn.cursor() as cur:
await cur.executemany(
"insert into execmanypipelinenoreturning(num) values (%s)",
await aconn.execute("drop table if exists pipeline_concurrency")
await aconn.execute("drop table if exists accessed")
async with aconn.transaction():
- await aconn.execute(
- """create unlogged table pipeline_concurrency (
+ await aconn.execute("""
+ create unlogged table pipeline_concurrency (
id serial primary key,
- value integer)"""
- )
+ value integer)""")
await aconn.execute("create unlogged table accessed as (select now() as value)")
async def update(value):
def test_execute(self, conn):
cur = conn.cursor()
- cur.execute(
- """
+ cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
- """
- )
+ """)
cur.execute(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier("test_compose"),
def test_executemany(self, conn):
cur = conn.cursor()
- cur.execute(
- """
+ cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
- """
- )
+ """)
cur.executemany(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier("test_compose"),
@pytest.mark.crdb_skip("copy")
def test_copy(self, conn):
cur = conn.cursor()
- cur.execute(
- """
+ cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
- """
- )
+ """)
with cur.copy(
sql.SQL("copy {t} (id, foo, bar, {f}) from stdin").format(
def test_invalid_name(self, conn, name):
if conn.info.parameter_status("is_superuser") != "on":
pytest.skip("not a superuser")
- conn.execute(
- f"""
+ conn.execute(f"""
set client_encoding to utf8;
create type "{name}";
create function invin(cstring) returns "{name}"
create function invout("{name}") returns cstring
language internal immutable strict as 'textout';
create type "{name}" (input=invin, output=invout, like=text);
- """
- )
+ """)
info = TypeInfo.fetch(conn, f'"{name}"')
class InvDumper(StrDumper):
info = TypeInfo.fetch(conn, name)
assert info.name == "testtype"
# assert info.schema == "testschema"
- cur = conn.execute(
- """
+ cur = conn.execute("""
select oid, typarray from pg_type
where oid = 'testschema.testtype'::regtype
- """
- )
+ """)
assert cur.fetchone() == (info.oid, info.array_oid)
def test_dump_tuple(conn, rec, obj):
cur = conn.cursor()
fields = [f"f{i} text" for i in range(len(obj))]
- cur.execute(
- f"""
+ cur.execute(f"""
drop type if exists tmptype;
create type tmptype as ({', '.join(fields)});
- """
- )
+ """)
info = CompositeInfo.fetch(conn, "tmptype")
register_composite(info, conn)
def test_dump_tuple_null(conn):
cur = conn.cursor()
- cur.execute(
- """
+ cur.execute("""
drop type if exists tmptype;
create type tmptype as (f1 text, f2 text);
- """
- )
+ """)
info = CompositeInfo.fetch(conn, "tmptype")
register_composite(info, conn)
conn.adapters.register_dumper(str, StrNoneDumper)
@pytest.mark.parametrize("fmt_in", PyFormat)
def test_dump_builtin_empty_range(conn, fmt_in):
- conn.execute(
- """
+ conn.execute("""
drop type if exists tmptype;
create type tmptype as (num integer, range daterange, nums integer[])
- """
- )
+ """)
info = CompositeInfo.fetch(conn, "tmptype")
register_composite(info, conn)
if is_crdb(svcconn):
pytest.skip(crdb_skip_message("composite"))
cur = svcconn.cursor()
- cur.execute(
- """
+ cur.execute("""
create schema if not exists testschema;
drop type if exists testcomp2 cascade;
create type testcomp as (foo text, bar int8, baz float8);
create type testcomp2 as (qux int8, quux testcomp);
create type testschema.testcomp as (foo text, bar int8, qux bool);
- """
- )
+ """)
return CompositeInfo.fetch(svcconn, "testcomp")
def test_invalid_fields_names(conn):
conn.execute("set client_encoding to utf8")
- conn.execute(
- f"""
+ conn.execute(f"""
create type "a-b" as ("c-d" text, "{eur}" int);
create type "-x-{eur}" as ("w-ww" "a-b", "0" int);
- """
- )
+ """)
ab = CompositeInfo.fetch(conn, '"a-b"')
x = CompositeInfo.fetch(conn, f'"-x-{eur}"')
register_composite(ab, conn)
@pytest.mark.crdb_skip("copy")
def test_load_copy(self, conn):
cur = conn.cursor(binary=False)
- with cur.copy(
- """
+ with cur.copy("""
copy (
select
'2000-01-01 01:02:03.123456-10:20'::timestamptz,
'11111111'::int4
) to stdout
- """
- ) as copy:
+ """) as copy:
copy.set_types(["timestamptz", "int4"])
rec = copy.read_row()
@pytest.mark.crdb_skip("copy")
def test_load_copy(self, conn):
cur = conn.cursor(binary=False)
- with cur.copy(
- """
+ with cur.copy("""
copy (
select
'01:02:03.123456-10:20'::timetz,
'11111111'::int4
) to stdout
- """
- ) as copy:
+ """) as copy:
copy.set_types(["timetz", "int4"])
rec = copy.read_row()
@pytest.mark.crdb_skip("copy")
def test_load_copy(self, conn):
cur = conn.cursor(binary=False)
- with cur.copy(
- """
+ with cur.copy("""
copy (
select
'1 days +00:00:01.000001'::interval,
'foo bar'::text
) to stdout
- """
- ) as copy:
+ """) as copy:
copy.set_types(["interval", "text"])
rec = copy.read_row()
def ensure_enum(enum, conn):
name = enum.__name__.lower()
labels = list(enum.__members__)
- conn.execute(
- sql.SQL(
- """
+ conn.execute(sql.SQL("""
drop type if exists {name};
create type {name} as enum ({labels});
- """
- ).format(name=sql.Identifier(name), labels=sql.SQL(",").join(labels))
- )
+ """).format(name=sql.Identifier(name), labels=sql.SQL(",").join(labels)))
return name, enum, labels
if is_crdb(conn):
pytest.skip(crdb_skip_message("range"))
- conn.execute(
- """
+ conn.execute("""
create schema if not exists testschema;
drop type if exists testrange cascade;
create type testrange as range (subtype = text, collation = "C");
create type testschema.testrange as range (subtype = float8);
- """
- )
+ """)
fetch_cases = [
self.package = package
self.bump_level = BumpLevel(bump_level) if bump_level else None
- self._ini_regex = re.compile(
- r"""(?ix)
+ self._ini_regex = re.compile(r"""(?ix)
^
(?P<pre> version \s* = \s* ")
(?P<ver> [^\s"]+)
(?P<post> " \s*)
\s* $
- """
- )
- self._extra_regex = re.compile(
- r"""(?ix)
+ """)
+ self._extra_regex = re.compile(r"""(?ix)
^
(?P<pre> \s* ")
(?P<package> [^\s]+)
(?P<ver> [^\s]+)
(?P<post> \s* (?:;.* \s*))
\s* $
- """
- )
+ """)
@cached_property
def current_version(self) -> Version:
"XX000": "InternalError_",
}
- seen = set(
- """
+ seen = set("""
Error Warning InterfaceError DataError DatabaseError ProgrammingError
IntegrityError InternalError NotSupportedError OperationalError
- """.split()
- )
+ """.split())
for c, cerrs in errors.items():
for sqstate, errlabel in list(cerrs.items()):
def get_py_oids(conn: Connection) -> list[str]:
lines = []
- for typname, oid in conn.execute(
- """
+ for typname, oid in conn.execute("""
select typname, oid
from pg_type
where
and (typtype = any('{b,r,m}') or typname = 'record')
and (typname !~ '^(_|pg_)' or typname = 'pg_lsn')
order by typname
-"""
- ):
+"""):
const_name = typname.upper() + "_OID"
lines.append(f"{const_name} = {oid}")
# Note: "record" is a pseudotype but still a useful one to have.
# "pg_lsn" is a documented public type and useful in streaming replication
lines = []
- for typname, oid, typarray, regtype, typdelim in conn.execute(
- """
+ for typname, oid, typarray, regtype, typdelim in conn.execute("""
select typname, oid, typarray,
-- CRDB might have quotes in the regtype representation
replace(typname::regtype::text, '''', '') as regtype,
and (typtype = 'b' or typname = 'record')
and (typname !~ '^(_|pg_)' or typname = 'pg_lsn')
order by typname
-"""
- ):
+"""):
typemod = typemods.get(typname)
# Weird legacy type in postgres catalog
def get_py_ranges(conn: Connection) -> list[str]:
lines = []
- for typname, oid, typarray, rngsubtype in conn.execute(
- """
+ for typname, oid, typarray, rngsubtype in conn.execute("""
select typname, oid, typarray, rngsubtype
from
pg_type t
oid < 10000
and typtype = 'r'
order by typname
-"""
- ):
+"""):
params = [f"{typname!r}, {oid}, {typarray}, subtype_oid={rngsubtype}"]
lines.append(f"RangeInfo({','.join(params)}),")
def get_py_multiranges(conn: Connection) -> list[str]:
lines = []
- for typname, oid, typarray, rngtypid, rngsubtype in conn.execute(
- """
+ for typname, oid, typarray, rngtypid, rngsubtype in conn.execute("""
select typname, oid, typarray, rngtypid, rngsubtype
from
pg_type t
oid < 10000
and typtype = 'm'
order by typname
-"""
- ):
+"""):
params = [
f"{typname!r}, {oid}, {typarray},"
f" range_oid={rngtypid}, subtype_oid={rngsubtype}"
def get_cython_oids(conn: Connection) -> list[str]:
lines = []
- for typname, oid in conn.execute(
- """
+ for typname, oid in conn.execute("""
select typname, oid
from pg_type
where
and (typtype = any('{b,r,m}') or typname = 'record')
and (typname !~ '^(_|pg_)' or typname = 'pg_lsn')
order by typname
-"""
- ):
+"""):
const_name = typname.upper() + "_OID"
lines.append(f" {const_name} = {oid}")