import re
import struct
from decimal import Decimal
-from typing import Any, Callable, Iterator, List, Optional, Set, Tuple, Type
-from typing import cast
+from typing import Any, cast, Callable, Iterator, List
+from typing import Optional, Pattern, Set, Tuple, Type
+from functools import lru_cache
from .. import pq
from .. import errors as e
class ArrayLoader(BaseArrayLoader):
format = pq.Format.TEXT
-
- # Tokenize an array representation into item and brackets
- # TODO: currently recognise only , as delimiter. Should be configured
- _re_parse = re.compile(
- br"""(?xi)
- ( [{}] # open or closed bracket
- | " (?: [^"\\] | \\. )* " # or a quoted string
- | [^"{},\\]+ # or an unquoted non-empty string
- ) ,?
- """
- )
+ delimiter = b","
def load(self, data: Buffer) -> List[Any]:
rv = None
stack: List[Any] = []
cast = self._tx.get_loader(self.base_oid, self.format).load
- for m in self._re_parse.finditer(data):
+ re_parse = _get_array_parse_regexp(self.delimiter)
+ for m in re_parse.finditer(data):
t = m.group(1)
if t == b"{":
a: List[Any] = []
_re_unescape = re.compile(br"\\(.)")
+@lru_cache()
+def _get_array_parse_regexp(delimiter: bytes) -> Pattern[bytes]:
+ """
+ Return a regexp to tokenize an array representation into item and brackets
+ """
+ return re.compile(
+ br"""(?xi)
+ ( [{}] # open or closed bracket
+ | " (?: [^"\\] | \\. )* " # or a quoted string
+ | [^"{}%s\\]+ # or an unquoted non-empty string
+ ) ,?
+ """
+ % delimiter
+ )
+
+
class ArrayBinaryLoader(BaseArrayLoader):
format = pq.Format.BINARY
info: TypeInfo, context: Optional[AdaptContext] = None
) -> None:
adapters = context.adapters if context else postgres.adapters
- for base in (ArrayLoader, ArrayBinaryLoader):
- lname = f"{info.name.title()}{base.__name__}"
- loader: Type[BaseArrayLoader] = type(
- lname, (base,), {"base_oid": info.oid}
- )
- adapters.register_loader(info.array_oid, loader)
+
+ base: Type[BaseArrayLoader] = ArrayLoader
+ lname = f"{info.name.title()}{base.__name__}"
+ attribs = {
+ "base_oid": info.oid,
+ "delimiter": info.delimiter.encode("utf-8"),
+ }
+ loader = type(lname, (base,), attribs)
+ adapters.register_loader(info.array_oid, loader)
+
+ base = ArrayBinaryLoader
+ lname = f"{info.name.title()}{base.__name__}"
+ attribs = {"base_oid": info.oid}
+ loader = type(lname, (base,), attribs)
+ adapters.register_loader(info.array_oid, loader)
def register_default_adapters(context: AdaptContext) -> None:
registered all the base loaders.
"""
for t in context.adapters.types:
- # TODO: handle different delimiters (box)
- if t.array_oid and getattr(t, "delimiter", None) == ",":
+ if t.array_oid:
t.register(context)
)
cur.execute("select data from test order by id")
assert cur.fetchall() == [([1.0],), ([],)]
+
+
+def test_array_no_comma_separator(conn):
+ cur = conn.execute("select '{(2,2),(1,1);(5,6),(3,4)}'::box[]")
+ # Not parsed at the moment, but split ok on ; separator
+ assert cur.fetchone()[0] == ["(2,2),(1,1)", "(5,6),(3,4)"]