]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add timetz binary adapters
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 12 May 2021 20:44:26 +0000 (22:44 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 13 May 2021 00:21:12 +0000 (02:21 +0200)
psycopg3/psycopg3/types/__init__.py
psycopg3/psycopg3/types/date.py
tests/fix_faker.py
tests/types/test_date.py

index 61337276fd20e738a1962948a730637d9c12be7d..abf69b902ddeb9de78f54cad0660dc6a04547ede 100644 (file)
@@ -85,6 +85,7 @@ from .date import (
     TimeDumper as TimeDumper,
     TimeBinaryDumper as TimeBinaryDumper,
     TimeTzDumper as TimeTzDumper,
+    TimeTzBinaryDumper as TimeTzBinaryDumper,
     DateTimeTzDumper as DateTimeTzDumper,
     DateTimeDumper as DateTimeDumper,
     TimeDeltaDumper as TimeDeltaDumper,
@@ -93,6 +94,7 @@ from .date import (
     TimeLoader as TimeLoader,
     TimeBinaryLoader as TimeBinaryLoader,
     TimeTzLoader as TimeTzLoader,
+    TimeTzBinaryLoader as TimeTzBinaryLoader,
     TimestampLoader as TimestampLoader,
     TimestamptzLoader as TimestamptzLoader,
     IntervalLoader as IntervalLoader,
@@ -218,6 +220,7 @@ def register_default_globals(ctx: AdaptContext) -> None:
     TimeLoader.register("time", ctx)
     TimeBinaryLoader.register("time", ctx)
     TimeTzLoader.register("timetz", ctx)
+    TimeTzBinaryLoader.register("timetz", ctx)
     TimestampLoader.register("timestamp", ctx)
     TimestamptzLoader.register("timestamptz", ctx)
     IntervalLoader.register("interval", ctx)
index 427fc75a461e1779c90cf8f1e7ce70ae8f4f4930..762d955fca19f17df261fac3f3efe40fadace5fa 100644 (file)
@@ -7,7 +7,7 @@ Adapters for date/time types.
 import re
 import sys
 import struct
-from datetime import date, datetime, time, timedelta
+from datetime import date, datetime, time, timedelta, timezone
 from typing import Callable, cast, Optional, Tuple, Union
 
 from ..pq import Format
@@ -24,6 +24,11 @@ _pack_int8 = cast(_PackInt, struct.Struct("!q").pack)
 _unpack_int4 = cast(_UnpackInt, struct.Struct("!i").unpack)
 _unpack_int8 = cast(_UnpackInt, struct.Struct("!q").unpack)
 
+_unpack_timetz = cast(
+    Callable[[bytes], Tuple[int, int]], struct.Struct("!qi").unpack
+)
+_pack_timetz = cast(Callable[[int, int], bytes], struct.Struct("!qi").pack)
+
 _pg_date_epoch = date(2000, 1, 1).toordinal()
 _py_date_min = date.min.toordinal()
 _py_date_max = date.max.toordinal()
@@ -110,7 +115,12 @@ class TimeTzBinaryDumper(TimeBinaryDumper):
     _oid = builtins["timetz"].oid
 
     def dump(self, obj: time) -> bytes:
-        raise NotImplementedError
+        ms = obj.microsecond + 1_000_000 * (
+            obj.second + 60 * (obj.minute + 60 * obj.hour)
+        )
+        off = obj.utcoffset()
+        assert off is not None
+        return _pack_timetz(ms, -int(off.total_seconds()))
 
 
 class DateTimeTzDumper(Dumper):
@@ -298,13 +308,7 @@ class TimeTzLoader(TimeLoader):
     _format = "%H:%M:%S.%f%z"
     _format_no_micro = _format.replace(".%f", "")
 
-    def __init__(self, oid: int, context: Optional[AdaptContext] = None):
-        if sys.version_info < (3, 7):
-            setattr(self, "load", self._load_py36)
-
-        super().__init__(oid, context)
-
-    def load(self, data: Buffer) -> time:
+    def _load(self, data: Buffer) -> time:
         if isinstance(data, memoryview):
             data = bytes(data)
 
@@ -330,7 +334,44 @@ class TimeTzLoader(TimeLoader):
         elif data[-9] in (43, 45):  # +-HH:MM:SS -> +-HHMM
             data = data[:-6] + data[-5:-3]
 
-        return TimeTzLoader.load(self, data)
+        return self._load(data)
+
+
+if sys.version_info >= (3, 7):
+    setattr(TimeTzLoader, "load", TimeTzLoader._load)
+else:
+    setattr(TimeTzLoader, "load", TimeTzLoader._load_py36)
+
+
+class TimeTzBinaryLoader(Loader):
+
+    format = Format.BINARY
+
+    def load(self, data: Buffer) -> time:
+        val, off = _unpack_timetz(data)
+
+        val, ms = divmod(val, 1_000_000)
+        val, s = divmod(val, 60)
+        h, m = divmod(val, 60)
+
+        try:
+            return time(h, m, s, ms, self._tz_from_sec(off))
+        except ValueError:
+            raise DataError(f"time not supported by Python: hour={h}")
+
+    def _tz_from_sec(self, sec: int) -> timezone:
+        return timezone(timedelta(seconds=-sec))
+
+    def _tz_from_sec_36(self, sec: int) -> timezone:
+        if sec % 60:
+            sec = round(sec / 60.0) * 60
+        return timezone(timedelta(seconds=-sec))
+
+
+if sys.version_info < (3, 7):
+    setattr(
+        TimeTzBinaryLoader, "_tz_from_sec", TimeTzBinaryLoader._tz_from_sec_36
+    )
 
 
 class TimestampLoader(DateLoader):
index 777c5078064893ba81c33cb2629cac286e4ed5fa..fcc8103abda341c2029f31f09ee5500962179c07 100644 (file)
@@ -134,6 +134,10 @@ class Faker:
                 schema[i] = [scls]
             elif cls is tuple:
                 schema[i] = tuple(self.choose_schema(types=types, ncols=ncols))
+            elif cls is dt.time:
+                # Pick timezone yes/no
+                if choice([True, False]):
+                    schema[i] = TimeTz
 
         return schema
 
@@ -349,6 +353,10 @@ class Faker:
         h, m = divmod(val, 60)
         return dt.time(h, m, s, ms)
 
+    def make_TimeTz(self, spec):
+        rv = self.make_time(spec)
+        return rv.replace(tzinfo=self._make_tz(spec))
+
     def make_UUID(self, spec):
         return UUID(bytes=bytes([randrange(256) for i in range(16)]))
 
@@ -376,11 +384,21 @@ class Faker:
             cls = choice(scal_types)
             return self.make(cls)
 
+    def _make_tz(self, spec):
+        minutes = randrange(-12 * 60, 12 * 60 + 1)
+        return dt.timezone(dt.timedelta(minutes=minutes))
+
 
 class JsonFloat:
     pass
 
 
+class TimeTz(dt.time):
+    """
+    Placeholder to create time objects with tzinfo.
+    """
+
+
 def deep_import(name):
     parts = deque(name.split("."))
     seen = []
index 10c2d8b8e4fe75496e90ecb762c95eadbd2c840c..cdb19affcf800213e4d0397c9c4861351ee57a21 100644 (file)
@@ -355,18 +355,11 @@ def test_load_time_24(conn, fmt_out):
         ("max~+12", "23:59:59.999999+12:00"),
     ],
 )
-def test_dump_timetz(conn, val, expr):
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
+def test_dump_timetz(conn, val, expr, fmt_in):
     cur = conn.cursor()
     cur.execute("set timezone to '-02:00'")
-    cur.execute(f"select '{expr}'::timetz = %s", (as_time(val),))
-    assert cur.fetchone()[0] is True
-
-
-@pytest.mark.xfail  # TODO: binary dump
-@pytest.mark.parametrize("val, expr", [("0,0~0", "00:00Z")])
-def test_dump_timetz_binary(conn, val, expr):
-    cur = conn.cursor()
-    cur.execute(f"select '{expr}'::time = %b", (as_time(val),))
+    cur.execute(f"select '{expr}'::timetz = %{fmt_in}", (as_time(val),))
     assert cur.fetchone()[0] is True
 
 
@@ -381,23 +374,16 @@ def test_dump_timetz_binary(conn, val, expr):
         ("3,0,0,456789~-2", "03:00:00.456789", "+02:00"),
     ],
 )
-def test_load_timetz(conn, val, timezone, expr):
-    cur = conn.cursor()
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
+def test_load_timetz(conn, val, timezone, expr, fmt_out):
+    cur = conn.cursor(binary=fmt_out)
     cur.execute(f"set timezone to '{timezone}'")
     cur.execute(f"select '{expr}'::timetz")
     assert cur.fetchone()[0] == as_time(val)
 
 
-@pytest.mark.xfail  # TODO: binary load
-@pytest.mark.parametrize("val, expr, timezone", [("0,0~2", "00:00", "-02:00")])
-def test_load_timetz_binary(conn, val, expr, timezone):
-    cur = conn.cursor(binary=Format.BINARY)
-    cur.execute(f"set timezone to '{timezone}'")
-    cur.execute(f"select '{expr}'::time")
-    assert cur.fetchone()[0] == as_time(val)
-
-
-def test_load_timetz_24(conn):
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
+def test_load_timetz_24(conn, fmt_out):
     cur = conn.cursor()
     cur.execute("select '24:00'::timetz")
     with pytest.raises(DataError):
@@ -414,8 +400,6 @@ def test_load_timetz_24(conn):
 )
 @pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
 def test_dump_time_tz_or_not_tz(conn, val, type, fmt_in):
-    if fmt_in == Format.BINARY:
-        pytest.xfail("binary time not implemented")
     val = as_time(val)
     cur = conn.cursor()
     cur.execute(