import datetime
import decimal
import json
+import re
from .. import config
from .. import engines
("integer", None),
("float", 28.5),
("float", None),
+ (
+ "float",
+ 1234567.89,
+ ),
+ ("numeric", 1234567.89),
+ # this one "works" because the float value you see here is
+ # lost immediately to floating point stuff
+ ("numeric", 99998969694839.983485848, requirements.python3),
+ ("numeric", 99939.983485848, requirements.python3),
+ ("_decimal", decimal.Decimal("1234567.89")),
+ (
+ "_decimal",
+ decimal.Decimal("99998969694839.983485848"),
+ # fails on SQLite and MySQL (non-mariadb)
+ requirements.cast_precision_numerics_many_significant_digits,
+ ),
+ (
+ "_decimal",
+ decimal.Decimal("99939.983485848"),
+ ),
] + json_elements
def decorate(fn):
return decorate
- @_index_fixtures(False)
- def test_index_typed_access(self, datatype, value):
+ def _json_value_insert(self, connection, datatype, value, data_element):
data_table = self.tables.data_table
- data_element = {"key1": value}
- with config.db.begin() as conn:
- conn.execute(
+ if datatype == "_decimal":
+
+ # Python's builtin json serializer basically doesn't support
+ # Decimal objects without implicit float conversion period.
+ # users can otherwise use simplejson which supports
+ # precision decimals
+
+ # https://bugs.python.org/issue16535
+
+ # inserting as strings to avoid a new fixture around the
+ # dialect which would have idiosyncrasies for different
+ # backends.
+
+ class DecimalEncoder(json.JSONEncoder):
+ def default(self, o):
+ if isinstance(o, decimal.Decimal):
+ return str(o)
+ return super(DecimalEncoder, self).default(o)
+
+ json_data = json.dumps(data_element, cls=DecimalEncoder)
+
+ # take the quotes out. yup, there is *literally* no other
+ # way to get Python's json.dumps() to put all the digits in
+ # the string
+ json_data = re.sub(r'"(%s)"' % str(value), str(value), json_data)
+
+ datatype = "numeric"
+
+ connection.execute(
+ data_table.insert().values(
+ name="row1",
+ # to pass the string directly to every backend, including
+ # PostgreSQL which needs the value to be CAST as JSON
+ # both in the SQL as well as at the prepared statement
+ # level for asyncpg, while at the same time MySQL
+ # doesn't even support CAST for JSON, here we are
+ # sending the string embedded in the SQL without using
+ # a parameter.
+ data=bindparam(None, json_data, literal_execute=True),
+ nulldata=bindparam(None, json_data, literal_execute=True),
+ ),
+ )
+ else:
+ connection.execute(
data_table.insert(),
{
"name": "row1",
},
)
- expr = data_table.c.data["key1"]
+ p_s = None
+
+ if datatype:
+ if datatype == "numeric":
+ a, b = str(value).split(".")
+ s = len(b)
+ p = len(a) + s
+
+ if isinstance(value, decimal.Decimal):
+ compare_value = value
+ else:
+ compare_value = decimal.Decimal(str(value))
+
+ p_s = (p, s)
+ else:
+ compare_value = value
+ else:
+ compare_value = value
+
+ return datatype, compare_value, p_s
+
+ @_index_fixtures(False)
+ @testing.emits_warning(r".*does \*not\* support Decimal objects natively")
+ def test_index_typed_access(self, datatype, value):
+ data_table = self.tables.data_table
+ data_element = {"key1": value}
+ with config.db.begin() as conn:
+
+ datatype, compare_value, p_s = self._json_value_insert(
+ conn, datatype, value, data_element
+ )
+
+ expr = data_table.c.data["key1"]
if datatype:
- expr = getattr(expr, "as_%s" % datatype)()
+ if datatype == "numeric" and p_s:
+ expr = expr.as_numeric(*p_s)
+ else:
+ expr = getattr(expr, "as_%s" % datatype)()
roundtrip = conn.scalar(select(expr))
- eq_(roundtrip, value)
+ eq_(roundtrip, compare_value)
if util.py3k: # skip py2k to avoid comparing unicode to str etc.
- is_(type(roundtrip), type(value))
+ is_(type(roundtrip), type(compare_value))
@_index_fixtures(True)
+ @testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def test_index_typed_comparison(self, datatype, value):
data_table = self.tables.data_table
data_element = {"key1": value}
+
with config.db.begin() as conn:
- conn.execute(
- data_table.insert(),
- {
- "name": "row1",
- "data": data_element,
- "nulldata": data_element,
- },
+ datatype, compare_value, p_s = self._json_value_insert(
+ conn, datatype, value, data_element
)
expr = data_table.c.data["key1"]
if datatype:
- expr = getattr(expr, "as_%s" % datatype)()
+ if datatype == "numeric" and p_s:
+ expr = expr.as_numeric(*p_s)
+ else:
+ expr = getattr(expr, "as_%s" % datatype)()
- row = conn.execute(select(expr).where(expr == value)).first()
+ row = conn.execute(
+ select(expr).where(expr == compare_value)
+ ).first()
# make sure we get a row even if value is None
- eq_(row, (value,))
+ eq_(row, (compare_value,))
@_index_fixtures(True)
+ @testing.emits_warning(r".*does \*not\* support Decimal objects natively")
def test_path_typed_comparison(self, datatype, value):
data_table = self.tables.data_table
data_element = {"key1": {"subkey1": value}}
with config.db.begin() as conn:
- conn.execute(
- data_table.insert(),
- {
- "name": "row1",
- "data": data_element,
- "nulldata": data_element,
- },
+
+ datatype, compare_value, p_s = self._json_value_insert(
+ conn, datatype, value, data_element
)
expr = data_table.c.data[("key1", "subkey1")]
if datatype:
- expr = getattr(expr, "as_%s" % datatype)()
+ if datatype == "numeric" and p_s:
+ expr = expr.as_numeric(*p_s)
+ else:
+ expr = getattr(expr, "as_%s" % datatype)()
- row = conn.execute(select(expr).where(expr == value)).first()
+ row = conn.execute(
+ select(expr).where(expr == compare_value)
+ ).first()
# make sure we get a row even if value is None
- eq_(row, (value,))
+ eq_(row, (compare_value,))
@testing.combinations(
(True,),