def find_insert_problem(self, conn):
"""Context manager to help finding a problematic value."""
try:
- yield
+ with conn.transaction():
+ yield
except psycopg.DatabaseError:
- conn.rollback()
cur = conn.cursor()
# Repeat insert one field at time, until finding the wrong one
cur.execute(self.drop_stmt)
@asynccontextmanager
async def find_insert_problem_async(self, aconn):
try:
- yield
+ async with aconn.transaction():
+ yield
except psycopg.DatabaseError:
- await aconn.rollback()
acur = aconn.cursor()
# Repeat insert one field at time, until finding the wrong one
await acur.execute(self.drop_stmt)
def match_float(self, spec, got, want, approx=False, rel=None):
if got is not None and isnan(got):
assert isnan(want)
+ else:
+ if approx or self._server_rounds():
+ assert got == pytest.approx(want, rel=rel)
+ else:
+ assert got == want
+
+ def _server_rounds(self):
+ """Return True if the connected server perform float rounding"""
+ if self.conn.info.vendor == "CockroachDB":
+ return True
else:
# Versions older than 12 make some rounding. e.g. in Postgres 10.4
# select '-1.409006204063909e+112'::float8
# -> -1.40900620406391e+112
- if not approx and self.conn.info.server_version >= 120000:
- assert got == want
- else:
- assert got == pytest.approx(want, rel=rel)
+ return self.conn.info.server_version < 120000
def make_Float4(self, spec):
return spec(self.make_float(spec, double=False))
tz = self._make_tz(spec) if spec[1] else None
return dt.time(h, m, s, ms, tz)
+ CRDB_TIMEDELTA_MAX = dt.timedelta(days=1281239)
+
def make_timedelta(self, spec):
- return choice([dt.timedelta.min, dt.timedelta.max]) * random()
+ if self.conn.info.vendor == "CockroachDB":
+ rng = [-self.CRDB_TIMEDELTA_MAX, self.CRDB_TIMEDELTA_MAX]
+ else:
+ rng = [dt.timedelta.min, dt.timedelta.max]
+
+ return choice(rng) * random()
def schema_tuple(self, cls):
# TODO: this is a complicated matter as it would involve creating