import importlib
import itertools
+import random
from sqlalchemy import and_
from sqlalchemy import Boolean
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_false
+from sqlalchemy.testing import is_not_
from sqlalchemy.testing import is_true
from sqlalchemy.testing import ne_
from sqlalchemy.util import class_hierarchy
a=Integer, b=String, c=Integer
),
text("select a, b, c from table where foo=:bar").bindparams(
- bindparam("bar", Integer)
+ bindparam("bar", type_=Integer)
),
text("select a, b, c from table where foo=:foo").bindparams(
- bindparam("foo", Integer)
+ bindparam("foo", type_=Integer)
),
text("select a, b, c from table where foo=:bar").bindparams(
- bindparam("bar", String)
+ bindparam("bar", type_=String)
),
),
lambda: (
column("z") - column("x"),
column("x") - column("z"),
column("z") > column("x"),
+ column("x").in_([5, 7]),
+ column("x").in_([10, 7, 8]),
# note these two are mathematically equivalent but for now they
# are considered to be different
column("z") >= column("x"),
type_coerce(column("z", Integer), Float),
),
lambda: (table_a.c.a, table_b.c.a),
- lambda: (tuple_([1, 2]), tuple_([3, 4])),
+ lambda: (tuple_(1, 2), tuple_(3, 4)),
lambda: (func.array_agg([1, 2]), func.array_agg([3, 4])),
lambda: (
func.percentile_cont(0.5).within_group(table_a.c.a),
lambda: (table_a, table_b),
]
+ dont_compare_values_fixtures = [
+ lambda: (
+ # same number of params each time, so compare for IN
+ # with legacy behavior of bind for each value works
+ column("x").in_(random.choices(range(10), k=3)),
+ # expanding IN places the whole list into a single parameter
+ # so it can be of arbitrary length as well
+ column("x").in_(
+ bindparam(
+ "q",
+ random.choices(range(10), k=random.randint(0, 7)),
+ expanding=True,
+ )
+ ),
+ column("x") == random.randint(1, 10),
+ )
+ ]
+
def _complex_fixtures():
def one():
a1 = table_a.alias()
class CacheKeyFixture(object):
- def _run_cache_key_fixture(self, fixture):
+ def _run_cache_key_fixture(self, fixture, compare_values):
case_a = fixture()
case_b = fixture()
if a == b:
a_key = case_a[a]._generate_cache_key()
b_key = case_b[b]._generate_cache_key()
+ is_not_(a_key, None)
+ is_not_(b_key, None)
+
eq_(a_key.key, b_key.key)
eq_(hash(a_key), hash(b_key))
for a_param, b_param in zip(
a_key.bindparams, b_key.bindparams
):
- assert a_param.compare(b_param, compare_values=False)
+ assert a_param.compare(
+ b_param, compare_values=compare_values
+ )
else:
a_key = case_a[a]._generate_cache_key()
b_key = case_b[b]._generate_cache_key()
for a_param, b_param in zip(
a_key.bindparams, b_key.bindparams
):
- if not a_param.compare(b_param, compare_values=True):
+ if not a_param.compare(
+ b_param, compare_values=compare_values
+ ):
break
else:
# this fails unconditionally since we could not
)
# note we're asserting the order of the params as well as
- # if there are dupes or not. ordering has to be deterministic
- # and matches what a traversal would provide.
- # regular traverse_depthfirst does produce dupes in cases like
+ # if there are dupes or not. ordering has to be
+ # deterministic and matches what a traversal would provide.
+ # regular traverse_depthfirst does produce dupes in cases
+ # like
# select([some_alias]).
# select_from(join(some_alias, other_table))
# where a bound parameter is inside of some_alias. the
class CacheKeyTest(CacheKeyFixture, CoreFixtures, fixtures.TestBase):
def test_cache_key(self):
- for fixture in self.fixtures:
- self._run_cache_key_fixture(fixture)
+ for fixtures_, compare_values in [
+ (self.fixtures, True),
+ (self.dont_compare_values_fixtures, False),
+ ]:
+ for fixture in fixtures_:
+ self._run_cache_key_fixture(fixture, compare_values)
def test_cache_key_unknown_traverse(self):
class Foobar1(ClauseElement):
and "crud" not in cls.__module__
and "dialects" not in cls.__module__ # TODO: dialects?
).difference({ColumnElement, UnaryExpression})
- for fixture in self.fixtures:
+
+ for fixture in self.fixtures + self.dont_compare_values_fixtures:
case_a = fixture()
for elem in case_a:
for mro in type(elem).__mro__:
is_false(bool(need), "%d Remaining classes: %r" % (len(need), need))
- def test_compare(self):
- for fixture in self.fixtures:
- case_a = fixture()
- case_b = fixture()
-
- for a, b in itertools.combinations_with_replacement(
- range(len(case_a)), 2
- ):
- if a == b:
- is_true(
- case_a[a].compare(case_b[b], compare_annotations=True),
- "%r != %r" % (case_a[a], case_b[b]),
- )
+ def test_compare_labels(self):
+ for fixtures_, compare_values in [
+ (self.fixtures, True),
+ (self.dont_compare_values_fixtures, False),
+ ]:
+ for fixture in fixtures_:
+ case_a = fixture()
+ case_b = fixture()
+
+ for a, b in itertools.combinations_with_replacement(
+ range(len(case_a)), 2
+ ):
+ if a == b:
+ is_true(
+ case_a[a].compare(
+ case_b[b],
+ compare_annotations=True,
+ compare_values=compare_values,
+ ),
+ "%r != %r" % (case_a[a], case_b[b]),
+ )
- else:
- is_false(
- case_a[a].compare(case_b[b], compare_annotations=True),
- "%r == %r" % (case_a[a], case_b[b]),
- )
+ else:
+ is_false(
+ case_a[a].compare(
+ case_b[b],
+ compare_annotations=True,
+ compare_values=compare_values,
+ ),
+ "%r == %r" % (case_a[a], case_b[b]),
+ )
def test_compare_col_identity(self):
stmt1 = (
)
def test_copy_internals(self):
- for fixture in self.fixtures:
- case_a = fixture()
- case_b = fixture()
-
- assert case_a[0].compare(case_b[0])
+ for fixtures_, compare_values in [
+ (self.fixtures, True),
+ (self.dont_compare_values_fixtures, False),
+ ]:
+ for fixture in fixtures_:
+ case_a = fixture()
+ case_b = fixture()
+
+ assert case_a[0].compare(
+ case_b[0], compare_values=compare_values
+ )
- clone = visitors.replacement_traverse(
- case_a[0], {}, lambda elem: None
- )
+ clone = visitors.replacement_traverse(
+ case_a[0], {}, lambda elem: None
+ )
- assert clone.compare(case_b[0])
-
- stack = [clone]
- seen = {clone}
- found_elements = False
- while stack:
- obj = stack.pop(0)
-
- items = [
- subelem
- for key, elem in clone.__dict__.items()
- if key != "_is_clone_of" and elem is not None
- for subelem in util.to_list(elem)
- if (
- isinstance(subelem, (ColumnElement, ClauseList))
- and subelem not in seen
- and not isinstance(subelem, Immutable)
- and subelem is not case_a[0]
+ assert clone.compare(case_b[0], compare_values=compare_values)
+
+ stack = [clone]
+ seen = {clone}
+ found_elements = False
+ while stack:
+ obj = stack.pop(0)
+
+ items = [
+ subelem
+ for key, elem in clone.__dict__.items()
+ if key != "_is_clone_of" and elem is not None
+ for subelem in util.to_list(elem)
+ if (
+ isinstance(subelem, (ColumnElement, ClauseList))
+ and subelem not in seen
+ and not isinstance(subelem, Immutable)
+ and subelem is not case_a[0]
+ )
+ ]
+ stack.extend(items)
+ seen.update(items)
+
+ if obj is not clone:
+ found_elements = True
+ # ensure the element will not compare as true
+ obj.compare = lambda other, **kw: False
+ obj.__visit_name__ = "dont_match"
+
+ if found_elements:
+ assert not clone.compare(
+ case_b[0], compare_values=compare_values
)
- ]
- stack.extend(items)
- seen.update(items)
-
- if obj is not clone:
- found_elements = True
- # ensure the element will not compare as true
- obj.compare = lambda other, **kw: False
- obj.__visit_name__ = "dont_match"
-
- if found_elements:
- assert not clone.compare(case_b[0])
- assert case_a[0].compare(case_b[0])
+ assert case_a[0].compare(
+ case_b[0], compare_values=compare_values
+ )
class CompareClausesTest(fixtures.TestBase):