cur = conn.cursor(binary=True)
cur.execute(f"set timezone to '{timezone}'")
cur.execute(f"select '{expr}'::timestamptz")
- assert cur.fetchone()[0] == as_dt(val)
+ assert cur.fetchone()[0] == as_utc_dt(val)
@pytest.mark.xfail # parse timezone names
return rv
+# Note: as_dt and as_utc_dt return the same timestamp, the first in a specified
+# timezone, the second in utc. However on Python < 3.7 there can't be seconds
+# in the timezone offset, so the result might be wrong up to 30 seconds.
+
+
def as_dt(s):
- if "~" in s:
- s, off = s.split("~")
- else:
- off = None
+ if "~" not in s:
+ return as_naive_dt(s)
+
+ s, off = s.split("~")
+ rv = as_naive_dt(s)
+ rv = rv.replace(tzinfo=as_tzinfo(off))
+ return rv
+
+def as_utc_dt(s):
+ if "~" not in s:
+ return as_naive_dt(s)
+
+ s, off = s.split("~")
+ rv = as_naive_dt(s)
+ off = as_tzoffset(off)
+ rv = (rv - off).replace(tzinfo=dt.timezone.utc)
+ return rv
+
+
+def as_naive_dt(s):
if "," in s:
rv = dt.datetime(*map(int, s.split(",")))
else:
rv = getattr(dt.datetime, s)
- if off:
- rv = rv.replace(tzinfo=as_tzinfo(off))
-
return rv
-def as_tzinfo(s):
+def as_tzoffset(s):
if s.startswith("-"):
mul = -1
s = s[1:]
else:
mul = 1
+ fields = ("hours", "minutes", "seconds")
+ return mul * dt.timedelta(**dict(zip(fields, map(int, s.split(":")))))
+
+
+def as_tzinfo(s):
+ off = as_tzoffset(s)
if sys.version_info < (3, 7):
- fields = ("hours", "minutes")
- else:
- fields = ("hours", "minutes", "seconds")
- tzoff = mul * dt.timedelta(**dict(zip(fields, map(int, s.split(":")))))
- return dt.timezone(tzoff)
+ off = dt.timedelta(seconds=round(off.total_seconds() // 60) * 60)
+
+ return dt.timezone(off)
def as_td(s):