From: Mike Bayer Date: Wed, 4 Sep 2019 20:23:14 +0000 (-0400) Subject: Backport refactor of bindparams tests X-Git-Tag: rel_1_3_9~22 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5072ed04520631310be9c5f273bf13210c64d37a;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Backport refactor of bindparams tests These move into their own class inside of test_compiler which will help with upcoming patch for #4837 Change-Id: Iad828d4a0d01d82bfb33213cedfc5b79d1860507 --- diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index 37e5f980d2..e56647da40 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -312,20 +312,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): checkparams=params, ) - def test_limit_offset_select_literal_binds(self): - stmt = select([1]).limit(5).offset(6) - self.assert_compile( - stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True - ) - - def test_limit_offset_compound_select_literal_binds(self): - stmt = select([1]).union(select([2])).limit(5).offset(6) - self.assert_compile( - stmt, - "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6", - literal_binds=True, - ) - def test_select_precol_compile_ordering(self): s1 = select([column("x")]).select_from(text("a")).limit(5).as_scalar() s2 = select([s1]).limit(10) @@ -1321,20 +1307,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT mytable.myid FROM mytable", ) - def test_multiple_col_binds(self): - self.assert_compile( - select( - [literal_column("*")], - or_( - table1.c.myid == 12, - table1.c.myid == "asdf", - table1.c.myid == "foo", - ), - ), - "SELECT * FROM mytable WHERE mytable.myid = :myid_1 " - "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3", - ) - def test_order_by_nulls(self): self.assert_compile( table2.select( @@ -1604,71 +1576,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): dialect=mysql.dialect(), ) - def test_render_binds_as_literal(self): - """test a compiler that renders binds inline into - SQL in the columns clause.""" - - dialect = default.DefaultDialect() - - class Compiler(dialect.statement_compiler): - ansi_bind_rules = True - - dialect.statement_compiler = Compiler - - self.assert_compile( - select([literal("someliteral")]), - "SELECT 'someliteral' AS anon_1", - dialect=dialect, - ) - - self.assert_compile( - select([table1.c.myid + 3]), - "SELECT mytable.myid + 3 AS anon_1 FROM mytable", - dialect=dialect, - ) - - self.assert_compile( - select([table1.c.myid.in_([4, 5, 6])]), - "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable", - dialect=dialect, - ) - - self.assert_compile( - select([func.mod(table1.c.myid, 5)]), - "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable", - dialect=dialect, - ) - - self.assert_compile( - select([literal("foo").in_([])]), - "SELECT 1 != 1 AS anon_1", - dialect=dialect, - ) - - self.assert_compile( - select([literal(util.b("foo"))]), - "SELECT 'foo' AS anon_1", - dialect=dialect, - ) - - # test callable - self.assert_compile( - select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]), - "SELECT mytable.myid = 5 AS anon_1 FROM mytable", - dialect=dialect, - ) - - empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic") - empty_in_dialect.statement_compiler = Compiler - - assert_raises_message( - exc.CompileError, - "Bind parameter 'foo' without a " - "renderable value not allowed here.", - bindparam("foo").in_([]).compile, - dialect=empty_in_dialect, - ) - def test_collate(self): # columns clause self.assert_compile( @@ -2150,383 +2057,20 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): " LIMIT -1 OFFSET :param_2) AS anon_2", ) - def test_binds(self): - for ( - stmt, - expected_named_stmt, - expected_positional_stmt, - expected_default_params_dict, - expected_default_params_list, - test_param_dict, - expected_test_params_dict, - expected_test_params_list, - ) in [ - ( - select( - [table1, table2], - and_( - table1.c.myid == table2.c.otherid, - table1.c.name == bindparam("mytablename"), - ), - ), - "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, myothertable.othername FROM mytable, " - "myothertable WHERE mytable.myid = myothertable.otherid " - "AND mytable.name = :mytablename", - "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, myothertable.othername FROM mytable, " - "myothertable WHERE mytable.myid = myothertable.otherid AND " - "mytable.name = ?", - {"mytablename": None}, - [None], - {"mytablename": 5}, - {"mytablename": 5}, - [5], - ), - ( - select( - [table1], - or_( - table1.c.myid == bindparam("myid"), - table2.c.otherid == bindparam("myid"), - ), - ), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable, myothertable WHERE mytable.myid = :myid " - "OR myothertable.otherid = :myid", - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable, myothertable WHERE mytable.myid = ? " - "OR myothertable.otherid = ?", - {"myid": None}, - [None, None], - {"myid": 5}, - {"myid": 5}, - [5, 5], - ), - ( - text( - "SELECT mytable.myid, mytable.name, " - "mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = :myid OR " - "myothertable.otherid = :myid" - ), - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = :myid OR " - "myothertable.otherid = :myid", - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = ? OR " - "myothertable.otherid = ?", - {"myid": None}, - [None, None], - {"myid": 5}, - {"myid": 5}, - [5, 5], - ), - ( - select( - [table1], - or_( - table1.c.myid == bindparam("myid", unique=True), - table2.c.otherid == bindparam("myid", unique=True), - ), - ), - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = " - ":myid_1 OR myothertable.otherid = :myid_2", - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = ? " - "OR myothertable.otherid = ?", - {"myid_1": None, "myid_2": None}, - [None, None], - {"myid_1": 5, "myid_2": 6}, - {"myid_1": 5, "myid_2": 6}, - [5, 6], - ), - ( - bindparam("test", type_=String, required=False) + text("'hi'"), - ":test || 'hi'", - "? || 'hi'", - {"test": None}, - [None], - {}, - {"test": None}, - [None], - ), - ( - # testing select.params() here - bindparam() objects - # must get required flag set to False - select( - [table1], - or_( - table1.c.myid == bindparam("myid"), - table2.c.otherid == bindparam("myotherid"), - ), - ).params({"myid": 8, "myotherid": 7}), - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = " - ":myid OR myothertable.otherid = :myotherid", - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = " - "? OR myothertable.otherid = ?", - {"myid": 8, "myotherid": 7}, - [8, 7], - {"myid": 5}, - {"myid": 5, "myotherid": 7}, - [5, 7], - ), - ( - select( - [table1], - or_( - table1.c.myid - == bindparam("myid", value=7, unique=True), - table2.c.otherid - == bindparam("myid", value=8, unique=True), - ), - ), - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = " - ":myid_1 OR myothertable.otherid = :myid_2", - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = " - "? OR myothertable.otherid = ?", - {"myid_1": 7, "myid_2": 8}, - [7, 8], - {"myid_1": 5, "myid_2": 6}, - {"myid_1": 5, "myid_2": 6}, - [5, 6], - ), - ]: + def test_cast(self): + tbl = table( + "casttest", + column("id", Integer), + column("v1", Float), + column("v2", Float), + column("ts", TIMESTAMP), + ) - self.assert_compile( - stmt, expected_named_stmt, params=expected_default_params_dict - ) - self.assert_compile( - stmt, expected_positional_stmt, dialect=sqlite.dialect() - ) - nonpositional = stmt.compile() - positional = stmt.compile(dialect=sqlite.dialect()) - pp = positional.params - eq_( - [pp[k] for k in positional.positiontup], - expected_default_params_list, - ) - - eq_( - nonpositional.construct_params(test_param_dict), - expected_test_params_dict, - ) - pp = positional.construct_params(test_param_dict) - eq_( - [pp[k] for k in positional.positiontup], - expected_test_params_list, - ) - - # check that params() doesn't modify original statement - s = select( - [table1], - or_( - table1.c.myid == bindparam("myid"), - table2.c.otherid == bindparam("myotherid"), - ), - ) - s2 = s.params({"myid": 8, "myotherid": 7}) - s3 = s2.params({"myid": 9}) - assert s.compile().params == {"myid": None, "myotherid": None} - assert s2.compile().params == {"myid": 8, "myotherid": 7} - assert s3.compile().params == {"myid": 9, "myotherid": 7} - - # test using same 'unique' param object twice in one compile - s = select([table1.c.myid]).where(table1.c.myid == 12).as_scalar() - s2 = select([table1, s], table1.c.myid == s) - self.assert_compile( - s2, - "SELECT mytable.myid, mytable.name, mytable.description, " - "(SELECT mytable.myid FROM mytable WHERE mytable.myid = " - ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = " - "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)", - ) - positional = s2.compile(dialect=sqlite.dialect()) - - pp = positional.params - assert [pp[k] for k in positional.positiontup] == [12, 12] - - # check that conflicts with "unique" params are caught - s = select( - [table1], - or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")), - ) - assert_raises_message( - exc.CompileError, - "conflicts with unique bind parameter " "of the same name", - str, - s, - ) - - s = select( - [table1], - or_( - table1.c.myid == 7, - table1.c.myid == 8, - table1.c.myid == bindparam("myid_1"), - ), - ) - assert_raises_message( - exc.CompileError, - "conflicts with unique bind parameter " "of the same name", - str, - s, - ) - - def _test_binds_no_hash_collision(self): - """test that construct_params doesn't corrupt dict - due to hash collisions""" - - total_params = 100000 - - in_clause = [":in%d" % i for i in range(total_params)] - params = dict(("in%d" % i, i) for i in range(total_params)) - t = text("text clause %s" % ", ".join(in_clause)) - eq_(len(t.bindparams), total_params) - c = t.compile() - pp = c.construct_params(params) - eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp))) - eq_(len(set(pp.values())), total_params) - - def test_bind_as_col(self): - t = table("foo", column("id")) - - s = select([t, literal("lala").label("hoho")]) - self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo") - - assert [str(c) for c in s.c] == ["id", "hoho"] - - def test_bind_callable(self): - expr = column("x") == bindparam("key", callable_=lambda: 12) - self.assert_compile(expr, "x = :key", {"x": 12}) - - def test_bind_params_missing(self): - assert_raises_message( - exc.InvalidRequestError, - r"A value is required for bind parameter 'x'", - select([table1]) - .where( - and_( - table1.c.myid == bindparam("x", required=True), - table1.c.name == bindparam("y", required=True), - ) - ) - .compile() - .construct_params, - params=dict(y=5), - ) - - assert_raises_message( - exc.InvalidRequestError, - r"A value is required for bind parameter 'x'", - select([table1]) - .where(table1.c.myid == bindparam("x", required=True)) - .compile() - .construct_params, - ) - - assert_raises_message( - exc.InvalidRequestError, - r"A value is required for bind parameter 'x', " - "in parameter group 2", - select([table1]) - .where( - and_( - table1.c.myid == bindparam("x", required=True), - table1.c.name == bindparam("y", required=True), - ) - ) - .compile() - .construct_params, - params=dict(y=5), - _group_number=2, - ) - - assert_raises_message( - exc.InvalidRequestError, - r"A value is required for bind parameter 'x', " - "in parameter group 2", - select([table1]) - .where(table1.c.myid == bindparam("x", required=True)) - .compile() - .construct_params, - _group_number=2, - ) - - def test_tuple(self): - self.assert_compile( - tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]), - "(mytable.myid, mytable.name) IN " - "((:param_1, :param_2), (:param_3, :param_4))", - ) - - dialect = default.DefaultDialect() - dialect.tuple_in_values = True - self.assert_compile( - tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]), - "(mytable.myid, mytable.name) IN " - "(VALUES (:param_1, :param_2), (:param_3, :param_4))", - dialect=dialect, - ) - - self.assert_compile( - tuple_(table1.c.myid, table1.c.name).in_( - [tuple_(table2.c.otherid, table2.c.othername)] - ), - "(mytable.myid, mytable.name) IN " - "((myothertable.otherid, myothertable.othername))", - ) - - self.assert_compile( - tuple_(table1.c.myid, table1.c.name).in_( - select([table2.c.otherid, table2.c.othername]) - ), - "(mytable.myid, mytable.name) IN (SELECT " - "myothertable.otherid, myothertable.othername FROM myothertable)", - ) - - def test_expanding_parameter(self): - self.assert_compile( - tuple_(table1.c.myid, table1.c.name).in_( - bindparam("foo", expanding=True) - ), - "(mytable.myid, mytable.name) IN ([EXPANDING_foo])", - ) - - dialect = default.DefaultDialect() - dialect.tuple_in_values = True - self.assert_compile( - tuple_(table1.c.myid, table1.c.name).in_( - bindparam("foo", expanding=True) - ), - "(mytable.myid, mytable.name) IN ([EXPANDING_foo])", - dialect=dialect, - ) - - self.assert_compile( - table1.c.myid.in_(bindparam("foo", expanding=True)), - "mytable.myid IN ([EXPANDING_foo])", - ) - - def test_cast(self): - tbl = table( - "casttest", - column("id", Integer), - column("v1", Float), - column("v2", Float), - column("ts", TIMESTAMP), - ) - - def check_results(dialect, expected_results, literal): - eq_( - len(expected_results), - 5, - "Incorrect number of expected results", + def check_results(dialect, expected_results, literal): + eq_( + len(expected_results), + 5, + "Incorrect number of expected results", ) eq_( str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), @@ -3180,6 +2724,485 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises(exc.ArgumentError, and_, ("a",), ("b",)) +class BindParameterTest(AssertsCompiledSQL, fixtures.TestBase): + __dialect__ = "default" + + def test_binds(self): + for ( + stmt, + expected_named_stmt, + expected_positional_stmt, + expected_default_params_dict, + expected_default_params_list, + test_param_dict, + expected_test_params_dict, + expected_test_params_list, + ) in [ + ( + select( + [table1, table2], + and_( + table1.c.myid == table2.c.otherid, + table1.c.name == bindparam("mytablename"), + ), + ), + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, myothertable.othername FROM mytable, " + "myothertable WHERE mytable.myid = myothertable.otherid " + "AND mytable.name = :mytablename", + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, myothertable.othername FROM mytable, " + "myothertable WHERE mytable.myid = myothertable.otherid AND " + "mytable.name = ?", + {"mytablename": None}, + [None], + {"mytablename": 5}, + {"mytablename": 5}, + [5], + ), + ( + select( + [table1], + or_( + table1.c.myid == bindparam("myid"), + table2.c.otherid == bindparam("myid"), + ), + ), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable, myothertable WHERE mytable.myid = :myid " + "OR myothertable.otherid = :myid", + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable, myothertable WHERE mytable.myid = ? " + "OR myothertable.otherid = ?", + {"myid": None}, + [None, None], + {"myid": 5}, + {"myid": 5}, + [5, 5], + ), + ( + text( + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = :myid OR " + "myothertable.otherid = :myid" + ), + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = :myid OR " + "myothertable.otherid = :myid", + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = ? OR " + "myothertable.otherid = ?", + {"myid": None}, + [None, None], + {"myid": 5}, + {"myid": 5}, + [5, 5], + ), + ( + select( + [table1], + or_( + table1.c.myid == bindparam("myid", unique=True), + table2.c.otherid == bindparam("myid", unique=True), + ), + ), + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = " + ":myid_1 OR myothertable.otherid = :myid_2", + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = ? " + "OR myothertable.otherid = ?", + {"myid_1": None, "myid_2": None}, + [None, None], + {"myid_1": 5, "myid_2": 6}, + {"myid_1": 5, "myid_2": 6}, + [5, 6], + ), + ( + bindparam("test", type_=String, required=False) + text("'hi'"), + ":test || 'hi'", + "? || 'hi'", + {"test": None}, + [None], + {}, + {"test": None}, + [None], + ), + ( + # testing select.params() here - bindparam() objects + # must get required flag set to False + select( + [table1], + or_( + table1.c.myid == bindparam("myid"), + table2.c.otherid == bindparam("myotherid"), + ), + ).params({"myid": 8, "myotherid": 7}), + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = " + ":myid OR myothertable.otherid = :myotherid", + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = " + "? OR myothertable.otherid = ?", + {"myid": 8, "myotherid": 7}, + [8, 7], + {"myid": 5}, + {"myid": 5, "myotherid": 7}, + [5, 7], + ), + ( + select( + [table1], + or_( + table1.c.myid + == bindparam("myid", value=7, unique=True), + table2.c.otherid + == bindparam("myid", value=8, unique=True), + ), + ), + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = " + ":myid_1 OR myothertable.otherid = :myid_2", + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = " + "? OR myothertable.otherid = ?", + {"myid_1": 7, "myid_2": 8}, + [7, 8], + {"myid_1": 5, "myid_2": 6}, + {"myid_1": 5, "myid_2": 6}, + [5, 6], + ), + ]: + + self.assert_compile( + stmt, expected_named_stmt, params=expected_default_params_dict + ) + self.assert_compile( + stmt, expected_positional_stmt, dialect=sqlite.dialect() + ) + nonpositional = stmt.compile() + positional = stmt.compile(dialect=sqlite.dialect()) + pp = positional.params + eq_( + [pp[k] for k in positional.positiontup], + expected_default_params_list, + ) + + eq_( + nonpositional.construct_params(test_param_dict), + expected_test_params_dict, + ) + pp = positional.construct_params(test_param_dict) + eq_( + [pp[k] for k in positional.positiontup], + expected_test_params_list, + ) + + # check that params() doesn't modify original statement + s = select( + [table1], + or_( + table1.c.myid == bindparam("myid"), + table2.c.otherid == bindparam("myotherid"), + ), + ) + s2 = s.params({"myid": 8, "myotherid": 7}) + s3 = s2.params({"myid": 9}) + assert s.compile().params == {"myid": None, "myotherid": None} + assert s2.compile().params == {"myid": 8, "myotherid": 7} + assert s3.compile().params == {"myid": 9, "myotherid": 7} + + # test using same 'unique' param object twice in one compile + s = select([table1.c.myid]).where(table1.c.myid == 12).as_scalar() + s2 = select([table1, s], table1.c.myid == s) + self.assert_compile( + s2, + "SELECT mytable.myid, mytable.name, mytable.description, " + "(SELECT mytable.myid FROM mytable WHERE mytable.myid = " + ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = " + "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)", + ) + positional = s2.compile(dialect=sqlite.dialect()) + + pp = positional.params + assert [pp[k] for k in positional.positiontup] == [12, 12] + + # check that conflicts with "unique" params are caught + s = select( + [table1], + or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")), + ) + assert_raises_message( + exc.CompileError, + "conflicts with unique bind parameter " "of the same name", + str, + s, + ) + + s = select( + [table1], + or_( + table1.c.myid == 7, + table1.c.myid == 8, + table1.c.myid == bindparam("myid_1"), + ), + ) + assert_raises_message( + exc.CompileError, + "conflicts with unique bind parameter " "of the same name", + str, + s, + ) + + def _test_binds_no_hash_collision(self): + """test that construct_params doesn't corrupt dict + due to hash collisions""" + + total_params = 100000 + + in_clause = [":in%d" % i for i in range(total_params)] + params = dict(("in%d" % i, i) for i in range(total_params)) + t = text("text clause %s" % ", ".join(in_clause)) + eq_(len(t.bindparams), total_params) + c = t.compile() + pp = c.construct_params(params) + eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp))) + eq_(len(set(pp.values())), total_params) + + def test_bind_as_col(self): + t = table("foo", column("id")) + + s = select([t, literal("lala").label("hoho")]) + self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo") + + assert [str(c) for c in s.alias().c] == ["anon_1.id", "anon_1.hoho"] + + def test_bind_callable(self): + expr = column("x") == bindparam("key", callable_=lambda: 12) + self.assert_compile(expr, "x = :key", {"x": 12}) + + def test_bind_params_missing(self): + assert_raises_message( + exc.InvalidRequestError, + r"A value is required for bind parameter 'x'", + select([table1]) + .where( + and_( + table1.c.myid == bindparam("x", required=True), + table1.c.name == bindparam("y", required=True), + ) + ) + .compile() + .construct_params, + params=dict(y=5), + ) + + assert_raises_message( + exc.InvalidRequestError, + r"A value is required for bind parameter 'x'", + select([table1]) + .where(table1.c.myid == bindparam("x", required=True)) + .compile() + .construct_params, + ) + + assert_raises_message( + exc.InvalidRequestError, + r"A value is required for bind parameter 'x', " + "in parameter group 2", + select([table1]) + .where( + and_( + table1.c.myid == bindparam("x", required=True), + table1.c.name == bindparam("y", required=True), + ) + ) + .compile() + .construct_params, + params=dict(y=5), + _group_number=2, + ) + + assert_raises_message( + exc.InvalidRequestError, + r"A value is required for bind parameter 'x', " + "in parameter group 2", + select([table1]) + .where(table1.c.myid == bindparam("x", required=True)) + .compile() + .construct_params, + _group_number=2, + ) + + def test_tuple(self): + self.assert_compile( + tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]), + "(mytable.myid, mytable.name) IN " + "((:param_1, :param_2), (:param_3, :param_4))", + ) + + dialect = default.DefaultDialect() + dialect.tuple_in_values = True + self.assert_compile( + tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]), + "(mytable.myid, mytable.name) IN " + "(VALUES (:param_1, :param_2), (:param_3, :param_4))", + dialect=dialect, + ) + + self.assert_compile( + tuple_(table1.c.myid, table1.c.name).in_( + [tuple_(table2.c.otherid, table2.c.othername)] + ), + "(mytable.myid, mytable.name) IN " + "((myothertable.otherid, myothertable.othername))", + ) + + self.assert_compile( + tuple_(table1.c.myid, table1.c.name).in_( + select([table2.c.otherid, table2.c.othername]) + ), + "(mytable.myid, mytable.name) IN (SELECT " + "myothertable.otherid, myothertable.othername FROM myothertable)", + ) + + def test_expanding_parameter(self): + self.assert_compile( + tuple_(table1.c.myid, table1.c.name).in_( + bindparam("foo", expanding=True) + ), + "(mytable.myid, mytable.name) IN ([EXPANDING_foo])", + ) + + dialect = default.DefaultDialect() + dialect.tuple_in_values = True + self.assert_compile( + tuple_(table1.c.myid, table1.c.name).in_( + bindparam("foo", expanding=True) + ), + "(mytable.myid, mytable.name) IN ([EXPANDING_foo])", + dialect=dialect, + ) + + self.assert_compile( + table1.c.myid.in_(bindparam("foo", expanding=True)), + "mytable.myid IN ([EXPANDING_foo])", + ) + + def test_limit_offset_select_literal_binds(self): + stmt = select([1]).limit(5).offset(6) + self.assert_compile( + stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True + ) + + def test_limit_offset_compound_select_literal_binds(self): + stmt = select([1]).union(select([2])).limit(5).offset(6) + self.assert_compile( + stmt, + "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6", + literal_binds=True, + ) + + def test_multiple_col_binds(self): + self.assert_compile( + select( + [literal_column("*")], + or_( + table1.c.myid == 12, + table1.c.myid == "asdf", + table1.c.myid == "foo", + ), + ), + "SELECT * FROM mytable WHERE mytable.myid = :myid_1 " + "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3", + ) + + def test_render_binds_as_literal(self): + """test a compiler that renders binds inline into + SQL in the columns clause.""" + + dialect = default.DefaultDialect() + + class Compiler(dialect.statement_compiler): + ansi_bind_rules = True + + dialect.statement_compiler = Compiler + + self.assert_compile( + select([literal("someliteral")]), + "SELECT 'someliteral' AS anon_1", + dialect=dialect, + ) + + self.assert_compile( + select([table1.c.myid + 3]), + "SELECT mytable.myid + 3 AS anon_1 FROM mytable", + dialect=dialect, + ) + + self.assert_compile( + select([table1.c.myid.in_([4, 5, 6])]), + "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable", + dialect=dialect, + ) + + self.assert_compile( + select([func.mod(table1.c.myid, 5)]), + "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable", + dialect=dialect, + ) + + self.assert_compile( + select([literal("foo").in_([])]), + "SELECT 1 != 1 AS anon_1", + dialect=dialect, + ) + + self.assert_compile( + select([literal(util.b("foo"))]), + "SELECT 'foo' AS anon_1", + dialect=dialect, + ) + + # test callable + self.assert_compile( + select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]), + "SELECT mytable.myid = 5 AS anon_1 FROM mytable", + dialect=dialect, + ) + + empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic") + empty_in_dialect.statement_compiler = Compiler + + assert_raises_message( + exc.CompileError, + "Bind parameter 'foo' without a " + "renderable value not allowed here.", + bindparam("foo").in_([]).compile, + dialect=empty_in_dialect, + ) + + def test_render_expanding_parameter(self): + self.assert_compile( + select([table1.c.myid]).where( + table1.c.myid.in_(bindparam("foo", expanding=True)) + ), + "SELECT mytable.myid FROM mytable " + "WHERE mytable.myid IN ([EXPANDING_foo])", + ) + + def test_render_expanding_parameter_literal_binds(self): + self.assert_compile( + select([table1.c.myid]).where( + table1.c.myid.in_(bindparam("foo", [1, 2, 3], expanding=True)) + ), + "SELECT mytable.myid FROM mytable " + "WHERE mytable.myid IN [1, 2, 3]", + literal_binds=True, + ) + + class UnsupportedTest(fixtures.TestBase): def test_unsupported_element_str_visit_name(self): from sqlalchemy.sql.expression import ClauseElement