from .generators import notifies
from ._preparing import PrepareManager
from .transaction import Transaction, AsyncTransaction
-from .utils.compat import asynccontextmanager
+from .utils.compat import asynccontextmanager, ZoneInfo
from .server_cursor import ServerCursor, AsyncServerCursor
logger = logging.getLogger("psycopg3")
connect = generators.connect
execute = generators.execute
+_UTC = ZoneInfo("UTC")
+
class Notify(NamedTuple):
"""An asynchronous notification received from the database."""
if result.status != ExecStatus.TUPLES_OK:
raise e.error_from_result(result, encoding=self.client_encoding)
+ @property
+ def timezone(self) -> ZoneInfo:
+ """The Python timezone info of the connection's timezone."""
+ tzname = self.pgconn.parameter_status(b"TimeZone")
+ if tzname:
+ try:
+ return ZoneInfo(tzname.decode("utf8"))
+ except KeyError:
+ logger.warning(
+ "unknown PostgreSQL timezone: %r will use UTC",
+ tzname.decode("utf8"),
+ )
+ return _UTC
+ else:
+ return _UTC
+
@property
def info(self) -> ConnectionInfo:
"""A `ConnectionInfo` attribute to inspect connection properties."""
from ..adapt import Buffer, Dumper, Loader, Format as Pg3Format
from ..proto import AdaptContext
from ..errors import InterfaceError, DataError
+from ..utils.compat import ZoneInfo
_PackInt = Callable[[int], bytes]
_UnpackInt = Callable[[bytes], Tuple[int]]
_pg_datetimetz_epoch = datetime(2000, 1, 1, tzinfo=timezone.utc)
_py_date_min_days = date.min.toordinal()
+_UTC = ZoneInfo("UTC")
+
class DateDumper(Dumper):
format = Format.TEXT
+ def __init__(self, oid: int, context: Optional[AdaptContext] = None):
+ super().__init__(oid, context)
+ self._timezone = self.connection.timezone if self.connection else _UTC
+
def _format_from_context(self) -> str:
ds = self._get_datestyle()
if ds.startswith(b"I"): # ISO
if data[-3] in (43, 45):
data += b"00"
- return super().load(data).astimezone(timezone.utc)
+ return super().load(data).astimezone(self._timezone)
def _load_py36(self, data: Buffer) -> datetime:
if isinstance(data, memoryview):
tzoff = -tzoff
rv = super().load(data[: m.start()])
- return (rv - tzoff).replace(tzinfo=timezone.utc)
+ return (rv - tzoff).replace(tzinfo=self._timezone)
def _load_notimpl(self, data: Buffer) -> datetime:
if isinstance(data, memoryview):
format = Format.BINARY
+ def __init__(self, oid: int, context: Optional[AdaptContext] = None):
+ super().__init__(oid, context)
+ self._timezone = self.connection.timezone if self.connection else _UTC
+
def load(self, data: Buffer) -> datetime:
micros = _unpack_int8(data)[0]
try:
- return _pg_datetimetz_epoch + timedelta(microseconds=micros)
+ ts = _pg_datetimetz_epoch + timedelta(microseconds=micros)
+ return ts.astimezone(self._timezone)
except OverflowError:
if micros <= 0:
raise DataError("timestamp too small (before year 1)")
cur.execute(f"set timezone to '{timezone}'")
got = cur.execute(f"select '{expr}'::timestamptz").fetchone()[0]
assert got == as_dt(val)
- assert got.tzinfo == dt.timezone.utc
@pytest.mark.parametrize("val, expr, timezone", load_datetimetz_samples)
cur.execute(f"set timezone to '{timezone}'")
got = cur.execute(f"select '{expr}'::timestamptz").fetchone()[0]
assert got == as_dt(val)
- assert got.tzinfo == dt.timezone.utc
@pytest.mark.xfail # parse timezone names