format = Format.TEXT
_re_format = re.compile(rb"^(\d+)[^\d](\d+)[^\d](\d+)$")
+ _ORDER_YMD = 0
+ _ORDER_DMY = 1
+ _ORDER_MDY = 2
+
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
super().__init__(oid, context)
- self._order = self._order_from_context()
+ ds = _get_datestyle(self.connection)
+ if ds.startswith(b"I"): # ISO
+ self._order = self._ORDER_YMD
+ elif ds.startswith(b"G"): # German
+ self._order = self._ORDER_DMY
+ elif ds.startswith(b"S") or ds.startswith(b"P"): # SQL or Postgres
+ self._order = (
+ self._ORDER_DMY if ds.endswith(b"DMY") else self._ORDER_MDY
+ )
+ else:
+ raise InterfaceError(f"unexpected DateStyle: {ds.decode('ascii')}")
def load(self, data: Buffer) -> date:
m = self._re_format.match(data)
raise DataError(f"BC dates not supported, got {s!r}")
raise DataError(f"can't parse date {s!r}")
- t = m.groups()
- ye, mo, da = (t[i] for i in self._order)
+ if self._order == self._ORDER_YMD:
+ ye, mo, da = m.groups()
+ elif self._order == self._ORDER_DMY:
+ da, mo, ye = m.groups()
+ else:
+ mo, da, ye = m.groups()
try:
return date(int(ye), int(mo), int(da))
except ValueError as e:
s = bytes(data).decode("utf8", "replace")
raise DataError(f"can't manage date {s!r}: {e}")
- def _order_from_context(self) -> Tuple[int, int, int]:
- ds = _get_datestyle(self.connection)
- if ds.startswith(b"I"): # ISO
- return (0, 1, 2)
- elif ds.startswith(b"G"): # German
- return (2, 1, 0)
- elif ds.startswith(b"S") or ds.startswith(b"P"): # SQL or Postgres
- return (2, 1, 0) if ds.endswith(b"DMY") else (2, 0, 1)
- else:
- raise InterfaceError(f"unexpected DateStyle: {ds.decode('ascii')}")
-
class DateBinaryLoader(Loader):
class TimestampLoader(Loader):
format = Format.TEXT
+
_re_format = re.compile(
rb"""(?ix)
^
- (?:(\d+)|[a-z]+) [^a-z0-9] # DoW or first number, separator
- (\d+|[a-z]+) [^a-z0-9] # Month name or second number, separator
- (\d+|[a-z]+) # Month name or thrid number
- (?: T | [^a-z0-9] ) # Separator, including T
- (\d+) [^a-z0-9] # Other 3 numbers
- (\d+) [^a-z0-9]
- (\d+)
- (?: \.(\d+) )? # micros
- (?: [^a-z0-9] (\d+) )? # year in PG format
+ (\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Date
+ (?: T | [^a-z0-9] ) # Separator, including T
+ (\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Time
+ (?: \.(\d+) )? # Micros
$
"""
)
+ _re_format_pg = re.compile(
+ rb"""(?ix)
+ ^
+ [a-z]+ [^a-z0-9] # DoW, separator
+ (\d+|[a-z]+) [^a-z0-9] # Month or day
+ (\d+|[a-z]+) [^a-z0-9] # Month or day
+ (\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Time
+ (?: \.(\d+) )? # Micros
+ [^a-z0-9] (\d+) # Year
+ $
+ """
+ )
+
+ _ORDER_YMD = 0
+ _ORDER_DMY = 1
+ _ORDER_MDY = 2
+ _ORDER_PGDM = 3
+ _ORDER_PGMD = 4
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
super().__init__(oid, context)
- self._order = self._order_from_context()
+
+ ds = _get_datestyle(self.connection)
+ if ds.startswith(b"I"): # ISO
+ self._order = self._ORDER_YMD
+ elif ds.startswith(b"G"): # German
+ self._order = self._ORDER_DMY
+ elif ds.startswith(b"S"): # SQL
+ self._order = (
+ self._ORDER_DMY if ds.endswith(b"DMY") else self._ORDER_MDY
+ )
+ elif ds.startswith(b"P"): # Postgres
+ self._order = (
+ self._ORDER_PGDM if ds.endswith(b"DMY") else self._ORDER_PGMD
+ )
+ self._re_format = self._re_format_pg
+ else:
+ raise InterfaceError(f"unexpected DateStyle: {ds.decode('ascii')}")
def load(self, data: Buffer) -> datetime:
m = self._re_format.match(data)
raise DataError(f"BC timestamps not supported, got {s!r}")
raise DataError(f"can't parse timestamp {s!r}")
- t = m.groups()
- ye, mo, da, ho, mi, se, ms = (t[i] for i in self._order)
+ if self._order == self._ORDER_YMD:
+ ye, mo, da, ho, mi, se, ms = m.groups()
+ imo = int(mo)
+ elif self._order == self._ORDER_DMY:
+ da, mo, ye, ho, mi, se, ms = m.groups()
+ imo = int(mo)
+ elif self._order == self._ORDER_MDY:
+ mo, da, ye, ho, mi, se, ms = m.groups()
+ imo = int(mo)
+ else:
+ if self._order == self._ORDER_PGDM:
+ da, mo, ho, mi, se, ms, ye = m.groups()
+ else:
+ mo, da, ho, mi, se, ms, ye = m.groups()
+ try:
+ imo = _month_abbr[mo]
+ except KeyError:
+ s = mo.decode("utf8", "replace")
+ raise DataError(f"unexpected month: {s!r}")
# Pad the fraction of second to get millis
if ms:
else:
ims = 0
- if not b"0" <= mo[0:1] <= b"9":
- try:
- mo = _month_abbr[mo]
- except KeyError:
- s = mo.decode("utf8", "replace")
- raise DataError(f"unexpected month: {s!r}")
-
try:
return datetime(
- int(ye), int(mo), int(da), int(ho), int(mi), int(se), ims
+ int(ye), imo, int(da), int(ho), int(mi), int(se), ims
)
except ValueError as e:
s = bytes(data).decode("utf8", "replace")
raise DataError(f"can't manage timestamp {s!r}: {e}")
- def _order_from_context(self) -> Tuple[int, int, int, int, int, int, int]:
- ds = _get_datestyle(self.connection)
- if ds.startswith(b"I"): # ISO
- return (0, 1, 2, 3, 4, 5, 6)
- elif ds.startswith(b"G"): # German
- return (2, 1, 0, 3, 4, 5, 6)
- elif ds.startswith(b"S"): # SQL
- return (
- (2, 1, 0, 3, 4, 5, 6)
- if ds.endswith(b"DMY")
- else (2, 0, 1, 3, 4, 5, 6)
- )
- elif ds.startswith(b"P"): # Postgres
- return (
- (7, 2, 1, 3, 4, 5, 6)
- if ds.endswith(b"DMY")
- else (7, 1, 2, 3, 4, 5, 6)
- )
- else:
- raise InterfaceError(f"unexpected DateStyle: {ds.decode('ascii')}")
-
class TimestampBinaryLoader(Loader):
_month_abbr = {
- n: str(i).encode("utf8")
+ n: i
for i, n in enumerate(
b"Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec".split(), 1
)