- State if the date is too small or too large, not just "not supported".
- Use similar messages in text and binary format.
See #315
def load(self, data: Buffer) -> datetime:
m = self._re_format.match(data)
if not m:
- s = bytes(data).decode("utf8", "replace")
- if s.endswith("BC"):
- raise DataError(f"BC timestamps not supported, got {s!r}")
- raise DataError(f"can't parse timestamp {s!r}")
+ raise _get_timestamp_load_error(self.connection, data) from None
if self._order == self._ORDER_YMD:
ye, mo, da, ho, mi, se, fr = m.groups()
try:
return datetime(int(ye), imo, int(da), int(ho), int(mi), int(se), us)
- except ValueError as e:
- s = bytes(data).decode("utf8", "replace")
- raise DataError(f"can't parse timestamp {s!r}: {e}") from None
+ except ValueError as ex:
+ raise _get_timestamp_load_error(self.connection, data, ex) from None
class TimestampBinaryLoader(Loader):
def load(self, data: Buffer) -> datetime:
m = self._re_format.match(data)
if not m:
- s = bytes(data).decode("utf8", "replace")
- if s.endswith("BC"):
- raise DataError(f"BC timestamps not supported, got {s!r}")
- raise DataError(f"can't parse timestamp {s!r}")
+ raise _get_timestamp_load_error(self.connection, data) from None
ye, mo, da, ho, mi, se, fr, sgn, oh, om, os = m.groups()
except ValueError as e:
ex = e
- s = bytes(data).decode("utf8", "replace")
- raise DataError(f"can't parse timestamptz {s!r}: {ex}") from None
+ raise _get_timestamp_load_error(self.connection, data, ex) from None
def _load_notimpl(self, data: Buffer) -> datetime:
s = bytes(data).decode("utf8", "replace")
return b"ISO, DMY"
+def _get_timestamp_load_error(
+ conn: Optional["BaseConnection[Any]"], data: Buffer, ex: Optional[Exception] = None
+) -> Exception:
+ s = bytes(data).decode("utf8", "replace")
+
+ def is_overflow(s: str) -> bool:
+ if not s:
+ return False
+
+ ds = _get_datestyle(conn)
+ if not ds.startswith(b"P"): # Postgres
+ return len(s.split()[0]) > 10 # date is first token
+ else:
+ return len(s.split()[-1]) > 4 # year is last token
+
+ if s == "-infinity" or s.endswith("BC"):
+ return DataError("timestamp too small (before year 1): {s!r}")
+ elif s == "infinity" or is_overflow(s):
+ return DataError(f"timestamp too large (after year 10K): {s!r}")
+ else:
+ return DataError(f"can't parse timestamp {s!r}: {ex or '(unknown)'}")
+
+
_month_abbr = {
n: i
for i, n in enumerate(b"Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec".split(), 1)
cdef object cload(self, const char *data, size_t length):
cdef const char *end = data + length
if end[-1] == b'C': # ends with BC
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"BC timestamp not supported, got {s!r}")
+ raise _get_timestamp_load_error(self._pgconn, data) from None
if self._order == ORDER_PGDM or self._order == ORDER_PGMD:
return self._cload_pg(data, end)
# Parse the first 6 groups of digits (date and time)
ptr = _parse_date_values(data, end, vals, 6)
if ptr == NULL:
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"can't parse timetz {s!r}")
+ raise _get_timestamp_load_error(self._pgconn, data) from None
# Parse the microseconds
cdef int us = 0
return cdt.datetime_new(
y, m, d, vals[HO], vals[MI], vals[SE], us, None)
except ValueError as ex:
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"can't parse timestamp {s!r}: {ex}") from None
+ raise _get_timestamp_load_error(self._pgconn, data, ex) from None
cdef object _cload_pg(self, const char *data, const char *end):
DEF HO = 0
seps[1] = strchr(seps[0] + 1, b' ') if seps[0] != NULL else NULL
seps[2] = strchr(seps[1] + 1, b' ') if seps[1] != NULL else NULL
if seps[2] == NULL:
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"can't parse timestamp {s!r}")
+ raise _get_timestamp_load_error(self._pgconn, data) from None
# Parse the following 3 groups of digits (time)
ptr = _parse_date_values(seps[2] + 1, end, vals, 3)
if ptr == NULL:
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"can't parse timestamp {s!r}")
+ raise _get_timestamp_load_error(self._pgconn, data) from None
# Parse the microseconds
cdef int us = 0
# Parse the year
ptr = _parse_date_values(ptr + 1, end, vals + 3, 1)
if ptr == NULL:
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"can't parse timestamp {s!r}")
+ raise _get_timestamp_load_error(self._pgconn, data) from None
# Resolve the MD order
cdef int m, d
else: # self._order == ORDER_PGMD
m = _month_abbr[seps[0][1 : seps[1] - seps[0]]]
d = int(seps[1][1 : seps[2] - seps[1]])
- except (KeyError, ValueError):
- s = data.decode("utf8", "replace")
- raise e.DataError(f"can't parse timestamp: {s!r}") from None
+ except (KeyError, ValueError) as ex:
+ raise _get_timestamp_load_error(self._pgconn, data, ex) from None
try:
return cdt.datetime_new(
vals[YE], m, d, vals[HO], vals[MI], vals[SE], us, None)
except ValueError as ex:
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"can't parse timestamp {s!r}: {ex}") from None
+ raise _get_timestamp_load_error(self._pgconn, data, ex) from None
@cython.final
except OverflowError:
if val <= 0:
- raise e.DataError(
- "timestamp too small (before year 1)"
- ) from None
+ raise e.DataError("timestamp too small (before year 1)") from None
else:
- raise e.DataError(
- "timestamp too large (after year 10K)"
- ) from None
+ raise e.DataError("timestamp too large (after year 10K)") from None
cdef class _BaseTimestamptzLoader(CLoader):
self._order = ORDER_DMY
cdef object cload(self, const char *data, size_t length):
- cdef const char *end = data + length
- if end[-1] == b'C': # ends with BC
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"BC timestamptz not supported, got {s!r}")
-
if self._order != ORDER_YMD:
return self._cload_notimpl(data, length)
+ cdef const char *end = data + length
+ if end[-1] == b'C': # ends with BC
+ raise _get_timestamp_load_error(self._pgconn, data) from None
+
DEF D1 = 0
DEF D2 = 1
DEF D3 = 2
cdef const char *ptr
ptr = _parse_date_values(data, end, vals, 6)
if ptr == NULL:
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"can't parse timestamptz {s!r}")
+ raise _get_timestamp_load_error(self._pgconn, data) from None
# Parse the microseconds
cdef int us = 0
# Parse the timezone
cdef int offsecs = _parse_timezone_to_seconds(&ptr, end)
if ptr == NULL:
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"can't parse timestamptz {s!r}")
+ raise _get_timestamp_load_error(self._pgconn, data) from None
tzoff = cdt.timedelta_new(0, offsecs, 0)
except ValueError as ex:
ex1 = ex
- s = bytes(data).decode("utf8", "replace")
- raise e.DataError(f"can't parse timestamptz {s!r}: {ex1}") from None
+ raise _get_timestamp_load_error(self._pgconn, data, ex1) from None
cdef object _cload_notimpl(self, const char *data, size_t length):
s = bytes(data)[:length].decode("utf8", "replace")
return tz
+cdef object _get_timestamp_load_error(
+ pq.PGconn pgconn, const char *data, ex: Optional[Exception] = None
+):
+ s = bytes(data).decode("utf8", "replace")
+
+ def is_overflow(s):
+ if not s:
+ return False
+
+ ds = _get_datestyle(pgconn)
+ if not ds.startswith(b"P"): # Postgres
+ return len(s.split()[0]) > 10 # date is first token
+ else:
+ return len(s.split()[-1]) > 4 # year is last token
+
+ if s == "-infinity" or s.endswith("BC"):
+ return e.DataError("timestamp too small (before year 1): {s!r}")
+ elif s == "infinity" or is_overflow(s):
+ return e.DataError(f"timestamp too large (after year 10K): {s!r}")
+ else:
+ return e.DataError(f"can't parse timestamp {s!r}: {ex or '(unknown)'}")
+
+
cdef _timezones = {}
_timezones[None] = timezone_utc
_timezones[b"UTC"] = timezone_utc
+
cdef object _timezone_from_connection(pq.PGconn pgconn):
"""Return the Python timezone info of the connection's timezone."""
if pgconn is None:
with pytest.raises(DataError):
cur.fetchone()[0]
+ overflow_samples = [
+ ("-infinity", "timestamp too small"),
+ ("1000-01-01 12:00 BC", "timestamp too small"),
+ ("10000-01-01 12:00", "timestamp too large"),
+ ("infinity", "timestamp too large"),
+ ]
+
+ @pytest.mark.parametrize("datestyle_out", ["ISO", "Postgres", "SQL", "German"])
+ @pytest.mark.parametrize("val, msg", overflow_samples)
+ def test_overflow_message(self, conn, datestyle_out, val, msg):
+ cur = conn.cursor()
+ cur.execute(f"set datestyle = {datestyle_out}, YMD")
+ cur.execute("select %s::timestamp", (val,))
+ with pytest.raises(DataError) as excinfo:
+ cur.fetchone()[0]
+ assert msg in str(excinfo.value)
+
+ @pytest.mark.parametrize("val, msg", overflow_samples)
+ def test_overflow_message_binary(self, conn, val, msg):
+ cur = conn.cursor(binary=True)
+ cur.execute("select %s::timestamp", (val,))
+ with pytest.raises(DataError) as excinfo:
+ cur.fetchone()[0]
+ assert msg in str(excinfo.value)
+
def test_load_all_month_names(self, conn):
cur = conn.cursor(binary=False)
cur.execute("set datestyle = 'Postgres'")
assert rec[0] == want
assert rec[1] == 11111111
+ overflow_samples = [
+ ("-infinity", "timestamp too small"),
+ ("1000-01-01 12:00+00 BC", "timestamp too small"),
+ ("10000-01-01 12:00+00", "timestamp too large"),
+ ("infinity", "timestamp too large"),
+ ]
+
+ @pytest.mark.parametrize("datestyle_out", ["ISO", "Postgres", "SQL", "German"])
+ @pytest.mark.parametrize("val, msg", overflow_samples)
+ def test_overflow_message(self, conn, datestyle_out, val, msg):
+ cur = conn.cursor()
+ cur.execute(f"set datestyle = {datestyle_out}, YMD")
+ cur.execute("select %s::timestamptz", (val,))
+ if datestyle_out == "ISO":
+ with pytest.raises(DataError) as excinfo:
+ cur.fetchone()[0]
+ assert msg in str(excinfo.value)
+ else:
+ with pytest.raises(NotImplementedError):
+ cur.fetchone()[0]
+
+ @pytest.mark.parametrize("val, msg", overflow_samples)
+ def test_overflow_message_binary(self, conn, val, msg):
+ cur = conn.cursor(binary=True)
+ cur.execute("select %s::timestamptz", (val,))
+ with pytest.raises(DataError) as excinfo:
+ cur.fetchone()[0]
+ assert msg in str(excinfo.value)
+
@pytest.mark.parametrize(
"valname, tzval, tzname",
[