from ..adapt import Dumper, Loader
from ..proto import AdaptContext
-from ..errors import InterfaceError
+from ..errors import InterfaceError, DataError
from .oids import builtins
# Most likely we received a BC date, which Python doesn't support
# Otherwise the unexpected value is displayed in the exception.
if data.endswith(b"BC"):
- raise ValueError(
+ raise DataError(
"Python doesn't support BC date:"
f" got {data.decode('utf8', 'replace')}"
)
- # Find the year from the date. We check if >= Y10K only in ISO format,
- # others are too silly to bother being polite.
- ds = self._get_datestyle()
- if ds.startswith(b"ISO"):
- year = int(data.split(b"-", 1)[0])
- if year > 9999:
- raise ValueError(
- "Python date doesn't support years after 9999:"
- f" got {data.decode('utf8', 'replace')}"
- )
+ if self._get_year_digits(data) > 4:
+ raise DataError(
+ "Python date doesn't support years after 9999:"
+ f" got {data.decode('utf8', 'replace')}"
+ )
# We genuinely received something we cannot parse
raise exc
+ def _get_year_digits(self, data: bytes) -> int:
+ datesep = self._format[2].encode("ascii")
+ parts = data.split(b" ")[0].split(datesep)
+ return max(map(len, parts))
+
@Loader.text(builtins["time"].oid)
class TimeLoader(Loader):
def _raise_error(self, data: bytes, exc: ValueError) -> time:
# Most likely, time 24:00
if data.startswith(b"24"):
- raise ValueError("time with hour 24 not supported by Python")
+ raise DataError(
+ f"time not supported by Python: {data.decode('ascii')}"
+ )
# We genuinely received something we cannot parse
raise exc
def _raise_error(self, data: bytes, exc: ValueError) -> datetime:
return cast(datetime, super()._raise_error(data, exc))
+ def _get_year_digits(self, data: bytes) -> int:
+ # Find the year from the date.
+ if not self._get_datestyle().startswith(b"P"): # Postgres
+ return super()._get_year_digits(data)
+ else:
+ parts = data.split()
+ if len(parts) > 4:
+ return len(parts[4])
+ else:
+ return 0
+
@Loader.text(builtins["timestamptz"].oid)
class TimestamptzLoader(TimestampLoader):
import datetime as dt
import pytest
+from psycopg3 import DataError
from psycopg3.adapt import Format
cur = conn.cursor()
cur.execute(f"set datestyle = {datestyle_out}, YMD")
cur.execute("select %s - 1", (dt.date.min,))
- with pytest.raises(ValueError):
+ with pytest.raises(DataError):
cur.fetchone()[0]
cur = conn.cursor()
cur.execute(f"set datestyle = {datestyle_out}, YMD")
cur.execute("select %s + 1", (dt.date.max,))
- with pytest.raises(ValueError):
+ with pytest.raises(DataError):
cur.fetchone()[0]
assert cur.fetchone()[0] == as_dt(val)
+@pytest.mark.parametrize("datestyle_out", ["ISO", "Postgres", "SQL", "German"])
+def test_load_datetime_bc(conn, datestyle_out):
+ cur = conn.cursor()
+ cur.execute(f"set datestyle = {datestyle_out}, YMD")
+ cur.execute("select %s - '1s'::interval", (dt.datetime.min,))
+ with pytest.raises(DataError):
+ cur.fetchone()[0]
+
+
+@pytest.mark.parametrize("datestyle_out", ["ISO", "SQL", "Postgres", "German"])
+def test_load_datetime_too_large(conn, datestyle_out):
+ cur = conn.cursor()
+ cur.execute(f"set datestyle = {datestyle_out}, YMD")
+ cur.execute("select %s + '1s'::interval", (dt.datetime.max,))
+ with pytest.raises(DataError):
+ cur.fetchone()[0]
+
+
#
# datetime+tz tests
#
def test_load_time_24(conn):
cur = conn.cursor()
cur.execute("select '24:00'::time")
- with pytest.raises(ValueError):
+ with pytest.raises(DataError):
cur.fetchone()[0]