From: Daniele Varrazzo Date: Tue, 27 Oct 2020 00:27:54 +0000 (+0100) Subject: Added time with timezone loader X-Git-Tag: 3.0.dev0~431 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6e4a426b4372fecd66ff2a43eb6ebc9276d92feb;p=thirdparty%2Fpsycopg.git Added time with timezone loader --- diff --git a/psycopg3/psycopg3/types/date.py b/psycopg3/psycopg3/types/date.py index 9837ffd1b..b54ca4560 100644 --- a/psycopg3/psycopg3/types/date.py +++ b/psycopg3/psycopg3/types/date.py @@ -149,6 +149,25 @@ class TimeLoader(Loader): raise exc +@Loader.text(builtins["timetz"].oid) +class TimeTzLoader(TimeLoader): + _format = "%H:%M:%S.%f%z" + _format_no_micro = _format.replace(".%f", "") + + def load(self, data: bytes) -> time: + # Hack to convert +HH in +HHMM + if data[-3:-2] in (b"-", b"+"): + data += b"00" + + fmt = self._format if b"." in data else self._format_no_micro + try: + dt = datetime.strptime(self._decode(data)[0], fmt) + except ValueError as e: + return self._raise_error(data, e) + + return dt.time().replace(tzinfo=dt.tzinfo) + + @Loader.text(builtins["timestamp"].oid) class TimestampLoader(DateLoader): def __init__(self, oid: int, context: AdaptContext): diff --git a/tests/types/test_date.py b/tests/types/test_date.py index fc154c9d7..1b76d7bf5 100644 --- a/tests/types/test_date.py +++ b/tests/types/test_date.py @@ -10,12 +10,6 @@ from psycopg3.adapt import Format # -def as_date(s): - return ( - dt.date(*map(int, s.split(","))) if "," in s else getattr(dt.date, s) - ) - - @pytest.mark.parametrize( "val, expr", [ @@ -70,7 +64,7 @@ def test_load_date(conn, val, expr): @pytest.mark.parametrize("val, expr", [("2000,1,1", "2000-01-01")]) def test_load_date_binary(conn, val, expr): cur = conn.cursor(format=Format.BINARY) - cur.execute("select '{expr}'::date" % expr) + cur.execute(f"select '{expr}'::date") assert cur.fetchone()[0] == as_date(val) @@ -105,28 +99,6 @@ def test_load_date_too_large(conn, datestyle_out): # -def as_dt(s): - if "~" in s: - s, off = s.split("~") - else: - off = None - - if "," in s: - rv = dt.datetime(*map(int, s.split(","))) - else: - rv = getattr(dt.datetime, s) - - if off: - tzoff = dt.timedelta( - **dict( - zip(("hours", "minutes", "seconds"), map(int, off.split(":"))) - ) - ) - rv = rv.replace(tzinfo=dt.timezone(tzoff)) - - return rv - - @pytest.mark.parametrize( "val, expr", [ @@ -153,7 +125,7 @@ def test_dump_datetime(conn, val, expr): def test_dump_datetime_binary(conn, val, expr): cur = conn.cursor() cur.execute("set timezone to '+02:00'") - cur.execute("select %s = %%b" % expr, (as_dt(val),)) + cur.execute(f"select {expr} = %b", (as_dt(val),)) assert cur.fetchone()[0] is True @@ -219,8 +191,14 @@ def test_load_datetime_too_large(conn, datestyle_out): "val, expr", [ ("min~2", "0001-01-01 00:00"), + ("min~-12", "0001-01-01 00:00-12:00"), + ("min~+12", "0001-01-01 00:00+12:00"), ("1000,1,1,0,0~2", "1000-01-01 00:00+2"), ("2000,1,1,0,0~2", "2000-01-01 00:00+2"), + ("2000,1,1,0,0~12", "2000-01-01 00:00+12"), + ("2000,1,1,0,0~-12", "2000-01-01 00:00-12"), + ("2000,1,1,0,0~01:02:03", "2000-01-01 00:00+01:02:03"), + ("2000,1,1,0,0~-01:02:03", "2000-01-01 00:00-01:02:03"), ("2000,12,31,23,59,59,999999~2", "2000-12-31 23:59:59.999999+2"), ("3000,1,1,0,0~2", "3000-01-01 00:00+2"), ("max~2", "9999-12-31 23:59:59.999999"), @@ -263,6 +241,7 @@ def test_dump_datetimetz_datestyle(conn, datestyle_in): ("2000,1,2,3,4,5,678~1", "2000-01-02 03:04:05.000678", "Europe/Rome"), ("2000,7,2,3,4,5,678~2", "2000-07-02 03:04:05.000678", "Europe/Rome"), ("2000,1,2,3,0,0,456789~2", "2000-01-02 03:00:00.456789", "-02:00"), + ("2000,1,2,3,0,0,456789~-2", "2000-01-02 03:00:00.456789", "+02:00"), ("2000,12,31~2", "2000-12-31", "-02:00"), ("1900,1,1~05:21:10", "1900-01-01", "Asia/Calcutta"), ], @@ -293,12 +272,6 @@ def test_load_datetimetz_tzname(conn, val, expr, datestyle_in, datestyle_out): # -def as_time(s): - return ( - dt.time(*map(int, s.split(","))) if "," in s else getattr(dt.time, s) - ) - - @pytest.mark.parametrize( "val, expr", [ @@ -314,10 +287,10 @@ def test_dump_time(conn, val, expr): @pytest.mark.xfail # TODO: binary dump -@pytest.mark.parametrize("val, expr", [(dt.time(0, 0), "00:00")]) +@pytest.mark.parametrize("val, expr", [("0,0", "00:00")]) def test_dump_time_binary(conn, val, expr): cur = conn.cursor() - cur.execute(f"select '{expr}'::time = %b", (val,)) + cur.execute(f"select '{expr}'::time = %b", (as_time(val),)) assert cur.fetchone()[0] is True @@ -342,7 +315,7 @@ def test_load_time(conn, val, expr): @pytest.mark.parametrize("val, expr", [("0,0", "00:00")]) def test_load_time_binary(conn, val, expr): cur = conn.cursor(format=Format.BINARY) - cur.execute("select '{expr}'::time" % expr) + cur.execute(f"select '{expr}'::time") assert cur.fetchone()[0] == as_time(val) @@ -351,3 +324,123 @@ def test_load_time_24(conn): cur.execute("select '24:00'::time") with pytest.raises(DataError): cur.fetchone()[0] + + +# +# time+tz tests +# + + +@pytest.mark.parametrize( + "val, expr", + [ + ("min~-10", "00:00-10:00"), + ("min~+12", "00:00+12:00"), + ("10,20,30,40~-2", "10:20:30.000040-02:00"), + ("10,20,30,40~0", "10:20:30.000040Z"), + ("10,20,30,40~+2:30", "10:20:30.000040+02:30"), + ("max~-12", "23:59:59.999999-12:00"), + ("max~+12", "23:59:59.999999+12:00"), + ], +) +def test_dump_timetz(conn, val, expr): + cur = conn.cursor() + cur.execute("set timezone to '-02:00'") + cur.execute(f"select '{expr}'::timetz = %s", (as_time(val),)) + assert cur.fetchone()[0] is True + + +@pytest.mark.xfail # TODO: binary dump +@pytest.mark.parametrize("val, expr", [("0,0~0", "00:00Z")]) +def test_dump_timetz_binary(conn, val, expr): + cur = conn.cursor() + cur.execute(f"select '{expr}'::time = %b", (as_time(val),)) + assert cur.fetchone()[0] is True + + +@pytest.mark.parametrize( + "val, expr, timezone", + [ + ("0,0~-12", "00:00", "12:00"), + ("0,0~12", "00:00", "-12:00"), + ("3,4,5,6~2", "03:04:05.000006", "-02:00"), + ("3,4,5,6~7:8", "03:04:05.000006", "-07:08"), + ("3,0,0,456789~2", "03:00:00.456789", "-02:00"), + ("3,0,0,456789~-2", "03:00:00.456789", "+02:00"), + ], +) +def test_load_timetz(conn, val, timezone, expr): + cur = conn.cursor() + cur.execute(f"set timezone to '{timezone}'") + cur.execute(f"select '{expr}'::timetz") + assert cur.fetchone()[0] == as_time(val) + + +@pytest.mark.xfail # TODO: binary load +@pytest.mark.parametrize("val, expr, timezone", [("0,0~2", "00:00", "-02:00")]) +def test_load_timetz_binary(conn, val, expr, timezone): + cur = conn.cursor(format=Format.BINARY) + cur.execute(f"set timezone to '{timezone}'") + cur.execute(f"select '{expr}'::time") + assert cur.fetchone()[0] == as_time(val) + + +def test_load_timetz_24(conn): + cur = conn.cursor() + cur.execute("select '24:00'::timetz") + with pytest.raises(DataError): + cur.fetchone()[0] + + +# +# Support +# + + +def as_date(s): + return ( + dt.date(*map(int, s.split(","))) if "," in s else getattr(dt.date, s) + ) + + +def as_time(s): + if "~" in s: + s, off = s.split("~") + else: + off = None + + rv = dt.time(*map(int, s.split(","))) if "," in s else getattr(dt.time, s) + if off: + rv = rv.replace(tzinfo=as_tzinfo(off)) + + return rv + + +def as_dt(s): + if "~" in s: + s, off = s.split("~") + else: + off = None + + if "," in s: + rv = dt.datetime(*map(int, s.split(","))) + else: + rv = getattr(dt.datetime, s) + + if off: + rv = rv.replace(tzinfo=as_tzinfo(off)) + + return rv + + +def as_tzinfo(s): + if s.startswith("-"): + mul = -1 + s = s[1:] + else: + mul = 1 + + tzoff = mul * dt.timedelta( + **dict(zip(("hours", "minutes", "seconds"), map(int, s.split(":")))) + ) + return dt.timezone(tzoff)