]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add time binary adapters
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 12 May 2021 19:38:58 +0000 (21:38 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 12 May 2021 20:45:37 +0000 (22:45 +0200)
No timezone yet, some tests fail.

psycopg3/psycopg3/types/__init__.py
psycopg3/psycopg3/types/date.py
tests/fix_faker.py
tests/types/test_date.py

index 2be28c4f0ff959904228f21ce3f43c03d2cb7c14..61337276fd20e738a1962948a730637d9c12be7d 100644 (file)
@@ -83,6 +83,7 @@ from .date import (
     DateDumper as DateDumper,
     DateBinaryDumper as DateBinaryDumper,
     TimeDumper as TimeDumper,
+    TimeBinaryDumper as TimeBinaryDumper,
     TimeTzDumper as TimeTzDumper,
     DateTimeTzDumper as DateTimeTzDumper,
     DateTimeDumper as DateTimeDumper,
@@ -90,6 +91,7 @@ from .date import (
     DateLoader as DateLoader,
     DateBinaryLoader as DateBinaryLoader,
     TimeLoader as TimeLoader,
+    TimeBinaryLoader as TimeBinaryLoader,
     TimeTzLoader as TimeTzLoader,
     TimestampLoader as TimestampLoader,
     TimestamptzLoader as TimestamptzLoader,
@@ -208,11 +210,13 @@ def register_default_globals(ctx: AdaptContext) -> None:
     DateDumper.register("datetime.date", ctx)
     DateBinaryDumper.register("datetime.date", ctx)
     TimeDumper.register("datetime.time", ctx)
+    TimeBinaryDumper.register("datetime.time", ctx)
     DateTimeTzDumper.register("datetime.datetime", ctx)
     TimeDeltaDumper.register("datetime.timedelta", ctx)
     DateLoader.register("date", ctx)
     DateBinaryLoader.register("date", ctx)
     TimeLoader.register("time", ctx)
+    TimeBinaryLoader.register("time", ctx)
     TimeTzLoader.register("timetz", ctx)
     TimestampLoader.register("timestamp", ctx)
     TimestamptzLoader.register("timestamptz", ctx)
index f348cab2a6d8beff68c412ba2aed941bf77d9ca7..427fc75a461e1779c90cf8f1e7ce70ae8f4f4930 100644 (file)
@@ -20,7 +20,9 @@ _PackInt = Callable[[int], bytes]
 _UnpackInt = Callable[[bytes], Tuple[int]]
 
 _pack_int4 = cast(_PackInt, struct.Struct("!i").pack)
+_pack_int8 = cast(_PackInt, struct.Struct("!q").pack)
 _unpack_int4 = cast(_UnpackInt, struct.Struct("!i").unpack)
+_unpack_int8 = cast(_UnpackInt, struct.Struct("!q").unpack)
 
 _pg_date_epoch = date(2000, 1, 1).toordinal()
 _py_date_min = date.min.toordinal()
@@ -48,26 +50,32 @@ class DateBinaryDumper(Dumper):
         return _pack_int4(days)
 
 
-class TimeDumper(Dumper):
-
-    format = Format.TEXT
+class _BaseTimeDumper(Dumper):
 
     # Can change to timetz type if the object dumped is naive
     _oid = builtins["time"].oid
 
-    def dump(self, obj: time) -> bytes:
-        return str(obj).encode("utf8")
-
     def get_key(
         self, obj: time, format: Pg3Format
     ) -> Union[type, Tuple[type]]:
-        # Use (cls,) to report the need to upgrade  to a dumper for timetz (the
+        # Use (cls,) to report the need to upgrade to a dumper for timetz (the
         # Frankenstein of the data types).
         if not obj.tzinfo:
             return self.cls
         else:
             return (self.cls,)
 
+    def upgrade(self, obj: time, format: Pg3Format) -> "Dumper":
+        raise NotImplementedError
+
+
+class TimeDumper(_BaseTimeDumper):
+
+    format = Format.TEXT
+
+    def dump(self, obj: time) -> bytes:
+        return str(obj).encode("utf8")
+
     def upgrade(self, obj: time, format: Pg3Format) -> "Dumper":
         if not obj.tzinfo:
             return self
@@ -80,6 +88,31 @@ class TimeTzDumper(TimeDumper):
     _oid = builtins["timetz"].oid
 
 
+class TimeBinaryDumper(_BaseTimeDumper):
+
+    format = Format.BINARY
+
+    def dump(self, obj: time) -> bytes:
+        ms = obj.microsecond + 1_000_000 * (
+            obj.second + 60 * (obj.minute + 60 * obj.hour)
+        )
+        return _pack_int8(ms)
+
+    def upgrade(self, obj: time, format: Pg3Format) -> "Dumper":
+        if not obj.tzinfo:
+            return self
+        else:
+            return TimeTzBinaryDumper(self.cls)
+
+
+class TimeTzBinaryDumper(TimeBinaryDumper):
+
+    _oid = builtins["timetz"].oid
+
+    def dump(self, obj: time) -> bytes:
+        raise NotImplementedError
+
+
 class DateTimeTzDumper(Dumper):
 
     format = Format.TEXT
@@ -244,6 +277,21 @@ class TimeLoader(Loader):
         raise exc
 
 
+class TimeBinaryLoader(Loader):
+
+    format = Format.BINARY
+
+    def load(self, data: Buffer) -> time:
+        val = _unpack_int8(data)[0]
+        val, ms = divmod(val, 1_000_000)
+        val, s = divmod(val, 60)
+        h, m = divmod(val, 60)
+        try:
+            return time(h, m, s, ms)
+        except ValueError:
+            raise DataError(f"time not supported by Python: hour={h}")
+
+
 class TimeTzLoader(TimeLoader):
 
     format = Format.TEXT
index 9eb81578c3867580729ba00796b440945937cfdc..777c5078064893ba81c33cb2629cac286e4ed5fa 100644 (file)
@@ -342,6 +342,13 @@ class Faker:
 
         return "".join(map(chr, rv))
 
+    def make_time(self, spec):
+        val = randrange(24 * 60 * 60 * 1_000_000)
+        val, ms = divmod(val, 1_000_000)
+        val, s = divmod(val, 60)
+        h, m = divmod(val, 60)
+        return dt.time(h, m, s, ms)
+
     def make_UUID(self, spec):
         return UUID(bytes=bytes([randrange(256) for i in range(16)]))
 
index 0f4251cf87f8bddad1e950966eeb157eea5e4974..10c2d8b8e4fe75496e90ecb762c95eadbd2c840c 100644 (file)
@@ -305,17 +305,10 @@ def test_dump_datetime_tz_or_not_tz(conn, val, type, fmt_in):
         ("max", "23:59:59.999999"),
     ],
 )
-def test_dump_time(conn, val, expr):
-    cur = conn.cursor()
-    cur.execute(f"select '{expr}'::time = %s", (as_time(val),))
-    assert cur.fetchone()[0] is True
-
-
-@pytest.mark.xfail  # TODO: binary dump
-@pytest.mark.parametrize("val, expr", [("0,0", "00:00")])
-def test_dump_time_binary(conn, val, expr):
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
+def test_dump_time(conn, val, expr, fmt_in):
     cur = conn.cursor()
-    cur.execute(f"select '{expr}'::time = %b", (as_time(val),))
+    cur.execute(f"select '{expr}'::time = %{fmt_in}", (as_time(val),))
     assert cur.fetchone()[0] is True
 
 
@@ -330,22 +323,16 @@ def test_dump_time_binary(conn, val, expr):
         ("max", "23:59:59.999999"),
     ],
 )
-def test_load_time(conn, val, expr):
-    cur = conn.cursor()
-    cur.execute(f"select '{expr}'::time")
-    assert cur.fetchone()[0] == as_time(val)
-
-
-@pytest.mark.xfail  # TODO: binary load
-@pytest.mark.parametrize("val, expr", [("0,0", "00:00")])
-def test_load_time_binary(conn, val, expr):
-    cur = conn.cursor(binary=Format.BINARY)
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
+def test_load_time(conn, val, expr, fmt_out):
+    cur = conn.cursor(binary=fmt_out)
     cur.execute(f"select '{expr}'::time")
     assert cur.fetchone()[0] == as_time(val)
 
 
-def test_load_time_24(conn):
-    cur = conn.cursor()
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
+def test_load_time_24(conn, fmt_out):
+    cur = conn.cursor(binary=fmt_out)
     cur.execute("select '24:00'::time")
     with pytest.raises(DataError):
         cur.fetchone()[0]