TimeTzDumper as TimeTzDumper,
TimeTzBinaryDumper as TimeTzBinaryDumper,
DateTimeTzDumper as DateTimeTzDumper,
+ DateTimeTzBinaryDumper as DateTimeTzBinaryDumper,
DateTimeDumper as DateTimeDumper,
+ DateTimeBinaryDumper as DateTimeBinaryDumper,
TimeDeltaDumper as TimeDeltaDumper,
DateLoader as DateLoader,
DateBinaryLoader as DateBinaryLoader,
TimeTzLoader as TimeTzLoader,
TimeTzBinaryLoader as TimeTzBinaryLoader,
TimestampLoader as TimestampLoader,
+ TimestampBinaryLoader as TimestampBinaryLoader,
TimestamptzLoader as TimestamptzLoader,
IntervalLoader as IntervalLoader,
)
TimeDumper.register("datetime.time", ctx)
TimeBinaryDumper.register("datetime.time", ctx)
DateTimeTzDumper.register("datetime.datetime", ctx)
+ DateTimeTzBinaryDumper.register("datetime.datetime", ctx)
TimeDeltaDumper.register("datetime.timedelta", ctx)
DateLoader.register("date", ctx)
DateBinaryLoader.register("date", ctx)
TimeTzLoader.register("timetz", ctx)
TimeTzBinaryLoader.register("timetz", ctx)
TimestampLoader.register("timestamp", ctx)
+ TimestampBinaryLoader.register("timestamp", ctx)
TimestamptzLoader.register("timestamptz", ctx)
IntervalLoader.register("interval", ctx)
)
_pack_timetz = cast(Callable[[int, int], bytes], struct.Struct("!qi").pack)
-_pg_date_epoch = date(2000, 1, 1).toordinal()
-_py_date_min = date.min.toordinal()
-_py_date_max = date.max.toordinal()
+_pg_date_epoch_days = date(2000, 1, 1).toordinal()
+_pg_datetime_epoch = datetime(2000, 1, 1)
+_pg_datetimetz_epoch = datetime(2000, 1, 1, tzinfo=timezone.utc)
+_py_date_min_days = date.min.toordinal()
class DateDumper(Dumper):
_oid = builtins["date"].oid
def dump(self, obj: date) -> bytes:
- days = obj.toordinal() - _pg_date_epoch
+ days = obj.toordinal() - _pg_date_epoch_days
return _pack_int4(days)
return _pack_timetz(ms, -int(off.total_seconds()))
-class DateTimeTzDumper(Dumper):
-
- format = Format.TEXT
+class _BaseDateTimeDumper(Dumper):
# Can change to timestamp type if the object dumped is naive
_oid = builtins["timestamptz"].oid
- def dump(self, obj: datetime) -> bytes:
- # NOTE: whatever the PostgreSQL DateStyle input format (DMY, MDY, YMD)
- # the YYYY-MM-DD is always understood correctly.
- return str(obj).encode("utf8")
-
def get_key(
self, obj: datetime, format: Pg3Format
) -> Union[type, Tuple[type]]:
else:
return (self.cls,)
+ def upgrade(self, obj: datetime, format: Pg3Format) -> "Dumper":
+ raise NotImplementedError
+
+
+class DateTimeTzDumper(_BaseDateTimeDumper):
+
+ format = Format.TEXT
+
+ def dump(self, obj: datetime) -> bytes:
+ # NOTE: whatever the PostgreSQL DateStyle input format (DMY, MDY, YMD)
+ # the YYYY-MM-DD is always understood correctly.
+ return str(obj).encode("utf8")
+
def upgrade(self, obj: datetime, format: Pg3Format) -> "Dumper":
if obj.tzinfo:
return self
_oid = builtins["timestamp"].oid
+class DateTimeTzBinaryDumper(_BaseDateTimeDumper):
+
+ format = Format.BINARY
+
+ def dump(self, obj: datetime) -> bytes:
+ raise NotImplementedError
+
+ def upgrade(self, obj: datetime, format: Pg3Format) -> "Dumper":
+ if obj.tzinfo:
+ return self
+ else:
+ return DateTimeBinaryDumper(self.cls)
+
+
+class DateTimeBinaryDumper(DateTimeTzBinaryDumper):
+ _oid = builtins["timestamp"].oid
+
+ # Somewhere, between year 2270 and 2275, float rounding in total_seconds
+ # cause us errors: switch to an algorithm without rounding before then.
+ _delta_prec_loss = (
+ datetime(2250, 1, 1) - _pg_datetime_epoch
+ ).total_seconds()
+
+ def dump(self, obj: datetime) -> bytes:
+ delta = obj - _pg_datetime_epoch
+ secs = delta.total_seconds()
+ if secs < self._delta_prec_loss:
+ micros = int(1_000_000 * secs)
+ else:
+ micros = (
+ 1_000_000 * (86_400 * delta.days + delta.seconds)
+ + delta.microseconds
+ )
+ return _pack_int8(micros)
+
+
class TimeDeltaDumper(Dumper):
format = Format.TEXT
format = Format.BINARY
def load(self, data: Buffer) -> date:
- days = _unpack_int4(data)[0] + _pg_date_epoch
- if _py_date_min <= days <= _py_date_max:
+ days = _unpack_int4(data)[0] + _pg_date_epoch_days
+ try:
return date.fromordinal(days)
- else:
- if days < _py_date_min:
+ except ValueError:
+ if days < _py_date_min_days:
raise DataError("date too small (before year 1)")
else:
raise DataError("date too large (after year 10K)")
return 0
+class TimestampBinaryLoader(Loader):
+
+ format = Format.BINARY
+
+ def load(self, data: Buffer) -> datetime:
+ micros = _unpack_int8(data)[0]
+ try:
+ return _pg_datetime_epoch + timedelta(microseconds=micros)
+ except OverflowError:
+ if micros <= 0:
+ raise DataError("timestamp too small (before year 1)")
+ else:
+ raise DataError("timestamp too large (after year 10K)")
+
+
class TimestamptzLoader(TimestampLoader):
format = Format.TEXT
("min", "0001-01-01 00:00"),
("1000,1,1,0,0", "1000-01-01 00:00"),
("2000,1,1,0,0", "2000-01-01 00:00"),
- ("2000,12,31,23,59,59,999999", "2000-12-31 23:59:59.999999"),
- ("3000,1,1,0,0", "3000-01-01 00:00"),
+ ("2000,1,2,3,4,5,6", "2000-01-02 03:04:05.000006"),
+ ("2000,1,2,3,4,5,678", "2000-01-02 03:04:05.000678"),
+ ("2000,1,2,3,0,0,456789", "2000-01-02 03:00:00.456789"),
+ ("2000,1,1,0,0,0,1", "2000-01-01 00:00:00.000001"),
+ ("2200,1,1,0,0,0,1", "2200-01-01 00:00:00.000001"),
+ ("2300,1,1,0,0,0,1", "2300-01-01 00:00:00.000001"),
+ ("7000,1,1,0,0,0,1", "7000-01-01 00:00:00.000001"),
("max", "9999-12-31 23:59:59.999999"),
],
)
-def test_dump_datetime(conn, val, expr):
- cur = conn.cursor()
- cur.execute("set timezone to '+02:00'")
- cur.execute(f"select '{expr}'::timestamp = %s", (as_dt(val),))
- assert cur.fetchone()[0] is True
-
-
-@pytest.mark.xfail # TODO: binary dump
-@pytest.mark.parametrize(
- "val, expr",
- [("2000,1,1,0,0", "'2000-01-01 00:00'::timestamp")],
-)
-def test_dump_datetime_binary(conn, val, expr):
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
+def test_dump_datetime(conn, val, expr, fmt_in):
cur = conn.cursor()
cur.execute("set timezone to '+02:00'")
- cur.execute(f"select {expr} = %b", (as_dt(val),))
+ cur.execute(f"select %{fmt_in}", (as_dt(val),))
+ print(cur.fetchone()[0])
+ cur.execute(f"select '{expr}'::timestamp = %{fmt_in}", (as_dt(val),))
assert cur.fetchone()[0] is True
cur = conn.cursor()
cur.execute(f"set datestyle = ISO, {datestyle_in}")
cur.execute(
- "select 'epoch'::timestamp + '1d 3h 4m 5s'::interval = %s",
+ "select 'epoch'::timestamp + '1d 3h 4m 5s'::interval = %t",
(dt.datetime(1970, 1, 2, 3, 4, 5),),
)
assert cur.fetchone()[0] is True
-@pytest.mark.parametrize(
- "val, expr",
- [
- ("min", "0001-01-01"),
- ("1000,1,1", "1000-01-01"),
- ("2000,1,1", "2000-01-01"),
- ("2000,1,2,3,4,5,6", "2000-01-02 03:04:05.000006"),
- ("2000,1,2,3,4,5,678", "2000-01-02 03:04:05.000678"),
- ("2000,1,2,3,0,0,456789", "2000-01-02 03:00:00.456789"),
- ("2000,12,31", "2000-12-31"),
- ("3000,1,1", "3000-01-01"),
- ("max", "9999-12-31 23:59:59.999999"),
- ],
-)
+load_datetime_samples = [
+ ("min", "0001-01-01"),
+ ("1000,1,1", "1000-01-01"),
+ ("2000,1,1", "2000-01-01"),
+ ("2000,1,2,3,4,5,6", "2000-01-02 03:04:05.000006"),
+ ("2000,1,2,3,4,5,678", "2000-01-02 03:04:05.000678"),
+ ("2000,1,2,3,0,0,456789", "2000-01-02 03:00:00.456789"),
+ ("2000,12,31", "2000-12-31"),
+ ("3000,1,1", "3000-01-01"),
+ ("max", "9999-12-31 23:59:59.999999"),
+]
+
+
+@pytest.mark.parametrize("val, expr", load_datetime_samples)
@pytest.mark.parametrize("datestyle_out", ["ISO", "Postgres", "SQL", "German"])
@pytest.mark.parametrize("datestyle_in", ["DMY", "MDY", "YMD"])
def test_load_datetime(conn, val, expr, datestyle_in, datestyle_out):
assert cur.fetchone()[0] == as_dt(val)
+@pytest.mark.parametrize("val, expr", load_datetime_samples)
+def test_load_datetime_binary(conn, val, expr):
+ cur = conn.cursor(binary=True)
+ cur.execute("set timezone to '+02:00'")
+ cur.execute(f"select '{expr}'::timestamp")
+ assert cur.fetchone()[0] == as_dt(val)
+
+
@pytest.mark.parametrize("val", ["min", "max"])
@pytest.mark.parametrize("datestyle_out", ["ISO", "Postgres", "SQL", "German"])
def test_load_datetime_overflow(conn, val, datestyle_out):
cur.fetchone()[0]
+@pytest.mark.parametrize("val", ["min", "max"])
+def test_load_datetime_overflow_binary(conn, val):
+ cur = conn.cursor(binary=True)
+ cur.execute(
+ "select %t::timestamp + %s * '1s'::interval",
+ (as_dt(val), -1 if val == "min" else 1),
+ )
+ with pytest.raises(DataError):
+ cur.fetchone()[0]
+
+
#
# datetime+tz tests
#