Callable[[bytes], Tuple[int, int, int]], struct.Struct("!qii").unpack
)
-
_pg_date_epoch_days = date(2000, 1, 1).toordinal()
_pg_datetime_epoch = datetime(2000, 1, 1)
_pg_datetimetz_epoch = datetime(2000, 1, 1, tzinfo=timezone.utc)
def _format_from_context(self) -> str:
ds = self._get_datestyle()
if ds.startswith(b"I"): # ISO
- return "%Y-%m-%d %H:%M:%S.%f%z"
+ if sys.version_info >= (3, 7):
+ return "%Y-%m-%d %H:%M:%S.%f%z"
+ else:
+ # No tz parsing: it will be handles separately.
+ return "%Y-%m-%d %H:%M:%S.%f"
# These don't work: the timezone name is not always displayed
# elif ds.startswith(b"G"): # German
setattr(self, "load", self._load_notimpl)
return ""
+ _re_tz = re.compile(br"([-+])(\d+)(?::(\d+)(?::(\d+))?)?$")
+
def load(self, data: Buffer) -> datetime:
if isinstance(data, memoryview):
data = bytes(data)
if data[-3] in (43, 45):
data += b"00"
- return super().load(data)
+ return super().load(data).astimezone(timezone.utc)
def _load_py36(self, data: Buffer) -> datetime:
if isinstance(data, memoryview):
data = bytes(data)
- # Drop seconds from timezone for Python 3.6
- # Also, Python 3.6 doesn't support HHMM, only HH:MM
- tzsep = (43, 45) # + and - bytes
- if data[-3] in tzsep: # +HH, -HH
- data += b"00"
- elif data[-6] in tzsep:
- data = data[:-3] + data[-2:]
- elif data[-9] in tzsep:
- data = data[:-6] + data[-5:-3]
- return super().load(data)
+ # Separate the timezone from the rest
+ m = self._re_tz.search(data)
+ if not m:
+ raise DataError(
+ "failed to parse timezone from '{data.decode('ascii')}'"
+ )
+
+ sign, hour, min, sec = m.groups()
+ tzoff = timedelta(
+ seconds=(int(sec) if sec else 0)
+ + 60 * ((int(min) if min else 0) + 60 * int(hour))
+ )
+ if sign == b"-":
+ tzoff = -tzoff
+
+ rv = super().load(data[: m.start()])
+ return (rv - tzoff).replace(tzinfo=timezone.utc)
def _load_notimpl(self, data: Buffer) -> datetime:
if isinstance(data, memoryview):
-import sys
import datetime as dt
import pytest
@pytest.mark.parametrize(
"val, expr",
[
- ("min~2", "0001-01-01 00:00"),
+ ("min~-2", "0001-01-01 00:00-02:00"),
("min~-12", "0001-01-01 00:00-12:00"),
- ("min~+12", "0001-01-01 00:00+12:00"),
("258,1,8,1,12,32,358261~1:2:3", "0258-1-8 1:12:32.358261+01:02:03"),
("1000,1,1,0,0~2", "1000-01-01 00:00+2"),
("2000,1,1,0,0~2", "2000-01-01 00:00+2"),
)
@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_dump_datetimetz(conn, val, expr, fmt_in):
- # adjust for Python 3.6 missing seconds in tzinfo
- if sys.version_info < (3, 7) and val.count(":") > 1:
- expr = expr.rsplit(":", 1)[0]
- val, rest = val.rsplit(":", 1)
- val += rest[3:] # skip tz seconds, but include micros
-
cur = conn.cursor()
cur.execute("set timezone to '-02:00'")
cur.execute(f"select '{expr}'::timestamptz = %{fmt_in}", (as_dt(val),))
cur = conn.cursor(binary=False)
cur.execute(f"set datestyle = {datestyle_out}, DMY")
cur.execute(f"set timezone to '{timezone}'")
- cur.execute(f"select '{expr}'::timestamptz")
- assert cur.fetchone()[0] == as_dt(val)
+ got = cur.execute(f"select '{expr}'::timestamptz").fetchone()[0]
+ assert got == as_dt(val)
+ assert got.tzinfo == dt.timezone.utc
@pytest.mark.parametrize("val, expr, timezone", load_datetimetz_samples)
def test_load_datetimetz_binary(conn, val, expr, timezone):
cur = conn.cursor(binary=True)
cur.execute(f"set timezone to '{timezone}'")
- cur.execute(f"select '{expr}'::timestamptz")
- assert cur.fetchone()[0] == as_utc_dt(val)
+ got = cur.execute(f"select '{expr}'::timestamptz").fetchone()[0]
+ assert got == as_dt(val)
+ assert got.tzinfo == dt.timezone.utc
@pytest.mark.xfail # parse timezone names
return rv
-# Note: as_dt and as_utc_dt return the same timestamp, the first in a specified
-# timezone, the second in utc. However on Python < 3.7 there can't be seconds
-# in the timezone offset, so the result might be wrong up to 30 seconds.
-
-
def as_dt(s):
if "~" not in s:
return as_naive_dt(s)
- s, off = s.split("~")
- rv = as_naive_dt(s)
- rv = rv.replace(tzinfo=as_tzinfo(off))
- return rv
-
-
-def as_utc_dt(s):
- if "~" not in s:
- return as_naive_dt(s)
-
s, off = s.split("~")
rv = as_naive_dt(s)
off = as_tzoffset(off)
def as_tzinfo(s):
off = as_tzoffset(s)
- if sys.version_info < (3, 7):
- off = dt.timedelta(seconds=round(off.total_seconds() // 60) * 60)
-
return dt.timezone(off)