]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Added timestamptz loader
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 26 Oct 2020 15:32:14 +0000 (16:32 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 28 Oct 2020 03:19:24 +0000 (04:19 +0100)
Won't deal with formats other than ISO because they return the timezone
name and that requires interpretation by the client, using a tz library
such as pytz or Python 3.9

psycopg3/psycopg3/types/date.py
tests/types/test_date.py

index ebde6185271b73285943cfc44395420dce116551..576bcf4e84694497d13df40957e2c1afec7c6d1c 100644 (file)
@@ -146,3 +146,34 @@ class TimestampLoader(DateLoader):
 
     def _raise_error(self, data: bytes, exc: ValueError) -> datetime:
         return cast(datetime, super()._raise_error(data, exc))
+
+
+@Loader.text(builtins["timestamptz"].oid)
+class TimestamptzLoader(TimestampLoader):
+    def _format_from_context(self) -> str:
+        ds = self._get_datestyle()
+        if ds.startswith(b"I"):  # ISO
+            return "%Y-%m-%d %H:%M:%S.%f%z"
+        elif ds.startswith(b"G"):  # German
+            return "%d.%m.%Y %H:%M:%S.%f %Z"
+        elif ds.startswith(b"S"):  # SQL
+            return (
+                "%d/%m/%Y %H:%M:%S.%f %Z"
+                if ds.endswith(b"DMY")
+                else "%m/%d/%Y %H:%M:%S.%f %Z"
+            )
+        elif ds.startswith(b"P"):  # Postgres
+            return (
+                "%a %d %b %H:%M:%S.%f %Y %Z"
+                if ds.endswith(b"DMY")
+                else "%a %b %d %H:%M:%S.%f %Y %Z"
+            )
+        else:
+            raise InterfaceError(f"unexpected DateStyle: {ds.decode('ascii')}")
+
+    def load(self, data: bytes) -> datetime:
+        # Hack to convert +HH in +HHMM
+        if data[-3:-2] in (b"-", b"+"):
+            data += b"00"
+
+        return super().load(data)
index d530217ad9b1949624cf58b55828f162b3eb281f..b3d4a14b43d405c8e507f96d415f98d66e69a730 100644 (file)
@@ -222,3 +222,64 @@ def test_dump_datetimetz_datestyle(conn, datestyle_in):
         (dt.datetime(1970, 1, 2, 5, 4, 5, 678000, tzinfo=tzinfo),),
     )
     assert cur.fetchone()[0] is True
+
+
+@pytest.mark.parametrize(
+    "val, offset, expr, timezone",
+    [
+        ("2000,1,1", "02:00", "2000-01-01", "-02:00"),
+        ("2000,1,2,3,4,5,6", "02:00", "2000-01-02 03:04:05.000006", "-02:00"),
+        (
+            "2000,1,2,3,4,5,678",
+            "01:00",
+            "2000-01-02 03:04:05.000678",
+            "Europe/Rome",
+        ),
+        (
+            "2000,7,2,3,4,5,678",
+            "02:00",
+            "2000-07-02 03:04:05.000678",
+            "Europe/Rome",
+        ),
+        (
+            "2000,1,2,3,0,0,456789",
+            "02:00",
+            "2000-01-02 03:00:00.456789",
+            "-02:00",
+        ),
+        ("2000,12,31", "02:00", "2000-12-31", "-02:00"),
+        ("1900,1,1", "05:21:10", "1900-01-01", "Asia/Calcutta"),
+    ],
+)
+@pytest.mark.parametrize("datestyle_out", ["ISO"])
+def test_load_datetimetz(conn, val, offset, expr, timezone, datestyle_out):
+    cur = conn.cursor()
+    cur.execute(f"set datestyle = {datestyle_out}, DMY")
+    val = dt.datetime(*map(int, val.split(",")))
+    tzoff = dt.timedelta(
+        **dict(
+            zip(("hours", "minutes", "seconds"), map(int, offset.split(":")))
+        )
+    )
+    val = val.replace(tzinfo=dt.timezone(tzoff))
+    cur.execute(f"set timezone to '{timezone}'")
+    cur.execute(f"select '{expr}'::timestamptz")
+    assert cur.fetchone()[0] == val
+
+
+@pytest.mark.xfail  # parse timezone names
+@pytest.mark.parametrize("val, expr", [("2000,1,1", "2000-01-01")])
+@pytest.mark.parametrize("datestyle_out", ["SQL", "Postgres", "German"])
+@pytest.mark.parametrize("datestyle_in", ["DMY", "MDY", "YMD"])
+def test_load_datetimetz_tzname(conn, val, expr, datestyle_in, datestyle_out):
+    cur = conn.cursor()
+    cur.execute(f"set datestyle = {datestyle_out}, {datestyle_in}")
+    val = (
+        dt.datetime(*map(int, val.split(",")))
+        if "," in val
+        else getattr(dt.datetime, val)
+    )
+    val = val.replace(tzinfo=dt.timezone(dt.timedelta(hours=2)))
+    cur.execute("set timezone to '-02:00'")
+    cur.execute(f"select '{expr}'::timestamptz")
+    assert cur.fetchone()[0] == val