rb"""(?ix)
^
(\d+) : (\d+) : (\d+) (?: \. (\d+) )? # Time and micros
- (-|\+) (\d+) (?: : (\d+) )? (?: : (\d+) )? # Timezone
+ ([-+]) (\d+) (?: : (\d+) )? (?: : (\d+) )? # Timezone
$
"""
)
(?: T | [^a-z0-9] ) # Separator, including T
(\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Time
(?: \.(\d+) )? # Micros
- (-|\+) (\d+) (?: : (\d+) )? (?: : (\d+) )? # Timezone
+ ([-+]) (\d+) (?: : (\d+) )? (?: : (\d+) )? # Timezone
$
"""
)
_re_interval = re.compile(
br"""
- (?: (?P<years> [-+]?\d+) \s+ years? \s* )?
- (?: (?P<months> [-+]?\d+) \s+ mons? \s* )?
- (?: (?P<days> [-+]?\d+) \s+ days? \s* )?
- (?: (?P<hsign> [-+])?
- (?P<hours> \d+ )
- : (?P<minutes> \d+ )
- : (?P<seconds> \d+ (?:\.\d+)? )
+ (?: ([-+]?\d+) \s+ years? \s* )? # Years
+ (?: ([-+]?\d+) \s+ mons? \s* )? # Months
+ (?: ([-+]?\d+) \s+ days? \s* )? # Days
+ (?: ([-+])? (\d+) : (\d+) : (\d+ (?:\.\d+)?) # Time
)?
""",
re.VERBOSE,
def load(self, data: Buffer) -> timedelta:
m = self._re_interval.match(data)
if not m:
- raise ValueError("can't parse interval: {data.decode('ascii')}")
+ s = bytes(data).decode("utf8", "replace")
+ raise DataError(f"can't parse interval {s!r}")
+ ye, mo, da, sgn, ho, mi, se = m.groups()
days = 0
seconds = 0.0
- tmp = m.group("years")
- if tmp:
- days += 365 * int(tmp)
-
- tmp = m.group("months")
- if tmp:
- days += 30 * int(tmp)
+ if ye:
+ days += 365 * int(ye)
+ if mo:
+ days += 30 * int(mo)
+ if da:
+ days += int(da)
- tmp = m.group("days")
- if tmp:
- days += int(tmp)
-
- if m.group("hours"):
- seconds = (
- 3600 * int(m.group("hours"))
- + 60 * int(m.group("minutes"))
- + float(m.group("seconds"))
- )
- if m.group("hsign") == b"-":
+ if ho:
+ seconds = 3600 * int(ho) + 60 * int(mi) + float(se)
+ if sgn == b"-":
seconds = -seconds
try:
return timedelta(days=days, seconds=seconds)
except OverflowError as e:
- raise DataError(str(e))
+ s = bytes(data).decode("utf8", "replace")
+ raise DataError(f"can't manage interval {s!r}: {e}")
def _load_notimpl(self, data: Buffer) -> timedelta:
- if isinstance(data, memoryview):
- data = bytes(data)
+ s = bytes(data).decode("utf8", "replace")
ints = (
self.connection
and self.connection.pgconn.parameter_status(b"IntervalStyle")
or b"unknown"
- )
+ ).decode("utf8", "replace")
raise NotImplementedError(
- "can't parse interval with IntervalStyle"
- f" {ints.decode('ascii')}: {data.decode('ascii')}"
+ f"can't parse interval with IntervalStyle {ints}: {s!r}"
)