stack: List[Any] = []
cast = self._tx.get_loader(self.base_oid, self.format).load
+ # Remove the dimensions information prefix (``[...]=``)
+ if data and data[0] == b"["[0]:
+ if isinstance(data, memoryview):
+ data = bytes(data)
+ idx = data.find(b"=")
+ if idx == -1:
+ raise e.DataError("malformed array, no '=' after dimension information")
+ data = data[idx + 1 :]
+
re_parse = _get_array_parse_regexp(self.delimiter)
for m in re_parse.finditer(data):
t = m.group(1)
cur = conn.execute("select '{(2,2),(1,1);(5,6),(3,4)}'::box[]")
# Not parsed at the moment, but split ok on ; separator
assert cur.fetchone()[0] == ["(2,2),(1,1)", "(5,6),(3,4)"]
+
+
+@pytest.mark.parametrize("fmt_out", pq.Format)
+@pytest.mark.parametrize(
+ "obj, want",
+ [
+ ("'[0:1]={a,b}'::text[]", ["a", "b"]),
+ ("'[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}'::int[]", [[[1, 2, 3], [4, 5, 6]]]),
+ ],
+)
+def test_array_with_bounds(conn, obj, want, fmt_out):
+ got = conn.execute(f"select {obj}", binary=fmt_out).fetchone()[0]
+ assert got == want
+
+
+@pytest.mark.parametrize("fmt_out", pq.Format)
+def test_all_chars_with_bounds(conn, fmt_out):
+ cur = conn.cursor(binary=fmt_out)
+ for i in range(1, 256):
+ c = chr(i)
+ cur.execute("select '[0:1]={a,b}'::text[] || %s::text[]", ([c],))
+ assert cur.fetchone()[0] == ["a", "b", c]
+
+ a = list(map(chr, range(1, 256)))
+ a.append("\u20ac")
+ cur.execute("select '[0:1]={a,b}'::text[] || %s::text[]", (a,))
+ assert cur.fetchone()[0] == ["a", "b"] + a
+
+ s = "".join(a)
+ cur.execute("select '[0:1]={a,b}'::text[] || %s::text[]", ([s],))
+ assert cur.fetchone()[0] == ["a", "b", s]