tests_str = [
- ([], "{}"),
([[[[[["a"]]]]]], "{{{{{{a}}}}}}"),
([[[[[[None]]]]]], "{{{{{{NULL}}}}}}"),
(["foo", "bar", "baz"], "{foo,bar,baz}"),
]
+@pytest.mark.parametrize("fmt_in", PyFormat)
+@pytest.mark.parametrize("type", ["text", "int4"])
+def test_dump_empty_list(conn, fmt_in, type):
+ cur = conn.cursor()
+ cur.execute(f"select %{fmt_in.value}::{type}[] = %s::{type}[]", ([], "{}"))
+ assert cur.fetchone()[0]
+
+
+@pytest.mark.crdb("skip", reason="nested array")
@pytest.mark.parametrize("fmt_in", PyFormat)
@pytest.mark.parametrize("obj, want", tests_str)
def test_dump_list_str(conn, obj, want, fmt_in):
assert cur.fetchone()[0]
+@pytest.mark.parametrize("fmt_out", pq.Format)
+def test_load_empty_list_str(conn, fmt_out):
+ cur = conn.cursor(binary=fmt_out)
+ cur.execute("select %s::text[]", ([],))
+ assert cur.fetchone()[0] == []
+
+
+@pytest.mark.crdb("skip", reason="nested array")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("want, obj", tests_str)
def test_load_list_str(conn, obj, want, fmt_out):
tests_int = [
- ([], "{}"),
([10, 20, -30], "{10,20,-30}"),
([10, None, 30], "{10,null,30}"),
([[10, 20], [30, 40]], "{{10,20},{30,40}}"),
]
+@pytest.mark.crdb("skip", reason="nested array")
@pytest.mark.parametrize("obj, want", tests_int)
def test_dump_list_int(conn, obj, want):
cur = conn.cursor()
tx.get_dumper(input, PyFormat.BINARY).dump(input)
+@pytest.mark.crdb("skip", reason="nested array")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize("want, obj", tests_int)
def test_load_list_int(conn, obj, want, fmt_out):
assert got == want
+@pytest.mark.crdb("skip", reason="composite")
def test_array_register(conn):
conn.execute("create table mytype (data text)")
cur = conn.execute("""select '(foo)'::mytype, '{"(foo)"}'::mytype[]""")
assert res[1] == ["(foo)"]
+@pytest.mark.crdb("skip", reason="aclitem")
def test_array_of_unknown_builtin(conn):
user = conn.execute("select user").fetchone()[0]
# we cannot load this type, but we understand it is an array
cur = conn.cursor()
cur.execute("create table test (id serial primary key, data date[])")
with conn.transaction():
- cur.execute(f"insert into test (data) values (%{fmt_in.value})", ([],))
+ cur.execute(
+ f"insert into test (data) values (%{fmt_in.value}) returning id", ([],)
+ )
+ id = cur.fetchone()[0]
cur.execute("select data from test")
assert cur.fetchone() == ([],)
# test untyped list in a filter
- cur.execute(f"select data from test where id = any(%{fmt_in.value})", ([1],))
+ cur.execute(f"select data from test where id = any(%{fmt_in.value})", ([id],))
assert cur.fetchone()
cur.execute(f"select data from test where id = any(%{fmt_in.value})", ([],))
assert not cur.fetchone()
assert cur.fetchall() == [([1.0],), ([],)]
+@pytest.mark.crdb("skip", reason="geometric types")
def test_dump_list_no_comma_separator(conn):
class Box:
def __init__(self, x1, y1, x2, y2):
assert got == "{(3,4),(1,2);(5,4),(3,2)}"
+@pytest.mark.crdb("skip", reason="geometric types")
def test_load_array_no_comma_separator(conn):
cur = conn.execute("select '{(2,2),(1,1);(5,6),(3,4)}'::box[]")
# Not parsed at the moment, but split ok on ; separator
assert cur.fetchone()[0] == ["(2,2),(1,1)", "(5,6),(3,4)"]
+@pytest.mark.crdb("skip", reason="array with bounds")
@pytest.mark.parametrize("fmt_out", pq.Format)
@pytest.mark.parametrize(
"obj, want",
assert got == want
+@pytest.mark.crdb("skip", reason="array with bounds")
@pytest.mark.parametrize("fmt_out", pq.Format)
def test_all_chars_with_bounds(conn, fmt_out):
cur = conn.cursor(binary=fmt_out)