From: Mike Bayer Date: Thu, 6 Dec 2007 22:23:10 +0000 (+0000) Subject: - generation of "unique" bind parameters has been simplified to use the same X-Git-Tag: rel_0_4_2~100 X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=541b6772e9b8a09b10bd7a16fa9e2b7f693d1194;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - generation of "unique" bind parameters has been simplified to use the same "unique identifier" mechanisms as everything else. This doesn't affect user code, except any code that might have been hardcoded against the generated names. Generated bind params now have the form "_", whereas before only the second bind of the same name would have this form. - bindparam() objects themselves can be used as keys for execute(), i.e. statement.execute({bind1:'foo', bind2:'bar'}) --- diff --git a/CHANGES b/CHANGES index d73ea69f08..99a7edcff0 100644 --- a/CHANGES +++ b/CHANGES @@ -22,6 +22,15 @@ CHANGES make proper use of Python unicode objects (i.e. u'hello' and not 'hello') so that data round trips accurately. + - generation of "unique" bind parameters has been simplified to use the same + "unique identifier" mechanisms as everything else. This doesn't affect + user code, except any code that might have been hardcoded against the generated + names. Generated bind params now have the form "_", + whereas before only the second bind of the same name would have this form. + + - bindparam() objects themselves can be used as keys for execute(), i.e. + statement.execute({bind1:'foo', bind2:'bar'}) + - tables with schemas can still be used in sqlite, firebird, schema name just gets dropped [ticket:890] diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py index 18c639af06..5d727bd6ae 100644 --- a/lib/sqlalchemy/orm/mapper.py +++ b/lib/sqlalchemy/orm/mapper.py @@ -1066,9 +1066,9 @@ class Mapper(object): mapper = table_to_mapper[table] clause = sql.and_() for col in mapper._pks_by_table[table]: - clause.clauses.append(col == sql.bindparam(col._label, type_=col.type, unique=True)) + clause.clauses.append(col == sql.bindparam(col._label, type_=col.type)) if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col): - clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col._label, type_=col.type, unique=True)) + clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col._label, type_=col.type)) statement = table.update(clause) rows = 0 supports_sane_rowcount = True @@ -1210,9 +1210,9 @@ class Mapper(object): del_objects.sort(comparator) clause = sql.and_() for col in mapper._pks_by_table[table]: - clause.clauses.append(col == sql.bindparam(col.key, type_=col.type, unique=True)) + clause.clauses.append(col == sql.bindparam(col.key, type_=col.type)) if mapper.version_id_col is not None and table.c.contains_column(mapper.version_id_col): - clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col.key, type_=mapper.version_id_col.type, unique=True)) + clause.clauses.append(mapper.version_id_col == sql.bindparam(mapper.version_id_col.key, type_=mapper.version_id_col.type)) statement = table.delete(clause) c = connection.execute(statement, del_objects) if c.supports_sane_multi_rowcount() and c.rowcount != len(del_objects): @@ -1389,11 +1389,11 @@ class Mapper(object): if leftcol is None or rightcol is None: return if leftcol.table not in needs_tables: - binary.left = sql.bindparam(leftcol.name, None, type_=binary.right.type, unique=True) - param_names.append(leftcol) + binary.left = sql.bindparam(None, None, type_=binary.right.type) + param_names.append((leftcol, binary.left)) elif rightcol not in needs_tables: - binary.right = sql.bindparam(rightcol.name, None, type_=binary.right.type, unique=True) - param_names.append(rightcol) + binary.right = sql.bindparam(None, None, type_=binary.right.type) + param_names.append((rightcol, binary.right)) allconds = [] param_names = [] @@ -1487,8 +1487,8 @@ class Mapper(object): identitykey = self.identity_key_from_instance(instance) params = {} - for c in param_names: - params[c.name] = self._get_attr_by_column(instance, c) + for c, bind in param_names: + params[bind] = self._get_attr_by_column(instance, c) row = selectcontext.session.connection(self).execute(statement, params).fetchone() self.populate_instance(selectcontext, instance, row, isnew=False, instancekey=identitykey, ispostselect=True) diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py index 3c647ac604..5a765fbd3e 100644 --- a/lib/sqlalchemy/orm/strategies.py +++ b/lib/sqlalchemy/orm/strategies.py @@ -108,8 +108,8 @@ class ColumnLoader(LoaderStrategy): statement = sql.select(needs_tables, cond, use_labels=True) def create_statement(instance): params = {} - for c in param_names: - params[c.name] = mapper._get_attr_by_column(instance, c) + for (c, bind) in param_names: + params[bind] = mapper._get_attr_by_column(instance, c) return (statement, params) def new_execute(instance, row, isnew, **flags): @@ -297,12 +297,11 @@ class LazyLoader(AbstractRelationLoader): (criterion, lazybinds, rev) = LazyLoader._create_lazy_clause(self.parent_property, reverse_direction=reverse_direction) bind_to_col = dict([(lazybinds[col].key, col) for col in lazybinds]) - class Visitor(visitors.ClauseVisitor): - def visit_bindparam(s, bindparam): - mapper = reverse_direction and self.parent_property.mapper or self.parent_property.parent - if bindparam.key in bind_to_col: - bindparam.value = mapper._get_attr_by_column(instance, bind_to_col[bindparam.key]) - return Visitor().traverse(criterion, clone=True) + def visit_bindparam(bindparam): + mapper = reverse_direction and self.parent_property.mapper or self.parent_property.parent + if bindparam.key in bind_to_col: + bindparam.value = mapper._get_attr_by_column(instance, bind_to_col[bindparam.key]) + return visitors.traverse(criterion, clone=True, visit_bindparam=visit_bindparam) def setup_loader(self, instance, options=None, path=None): if not mapper.has_mapper(instance): @@ -416,7 +415,7 @@ class LazyLoader(AbstractRelationLoader): if should_bind(leftcol, rightcol): col = leftcol binary.left = binds.setdefault(leftcol, - sql.bindparam(None, None, type_=binary.right.type, unique=True)) + sql.bindparam(None, None, type_=binary.right.type)) reverse[rightcol] = binds[col] # the "left is not right" compare is to handle part of a join clause that is "table.c.col1==table.c.col1", @@ -424,7 +423,7 @@ class LazyLoader(AbstractRelationLoader): if leftcol is not rightcol and should_bind(rightcol, leftcol): col = rightcol binary.right = binds.setdefault(rightcol, - sql.bindparam(None, None, type_=binary.left.type, unique=True)) + sql.bindparam(None, None, type_=binary.left.type)) reverse[leftcol] = binds[col] lazywhere = primaryjoin diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index 749ce4c10f..aec75e76c3 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -196,7 +196,7 @@ class DefaultCompiler(engine.Compiled): if params: pd = {} for bindparam, name in self.bind_names.iteritems(): - for paramname in (bindparam.key, bindparam.shortname, name): + for paramname in (bindparam, bindparam.key, bindparam.shortname, name): if paramname in params: pd[name] = params[paramname] break @@ -373,26 +373,13 @@ class DefaultCompiler(engine.Compiled): return self.operators.get(operator, str(operator)) def visit_bindparam(self, bindparam, **kwargs): - # TODO: remove this whole "unique" thing, just use regular - # anonymous params to implement. params used for inserts/updates - # etc. should no longer be "unique". - if bindparam.unique: - count = 1 - key = bindparam.key - # redefine the generated name of the bind param in the case - # that we have multiple conflicting bind parameters. - while self.binds.setdefault(key, bindparam) is not bindparam: - tag = "_%d" % count - key = bindparam.key + tag - count += 1 - bindparam.key = key - return self.bindparam_string(self._truncate_bindparam(bindparam)) - else: - existing = self.binds.get(bindparam.key) - if existing is not None and existing.unique: + name = self._truncate_bindparam(bindparam) + if name in self.binds: + existing = self.binds[name] + if existing.unique or bindparam.unique: raise exceptions.CompileError("Bind parameter '%s' conflicts with unique bind parameter of the same name" % bindparam.key) - self.binds[bindparam.key] = bindparam - return self.bindparam_string(self._truncate_bindparam(bindparam)) + self.binds[bindparam.key] = self.binds[name] = bindparam + return self.bindparam_string(name) def _truncate_bindparam(self, bindparam): if bindparam in self.bind_names: @@ -632,7 +619,7 @@ class DefaultCompiler(engine.Compiled): """ def create_bind_param(col, value): - bindparam = sql.bindparam(col.key, value, type_=col.type, unique=True) + bindparam = sql.bindparam(col.key, value, type_=col.type) self.binds[col.key] = bindparam return self.bindparam_string(self._truncate_bindparam(bindparam)) diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py index 6ad29218f5..732d4fdf9d 100644 --- a/lib/sqlalchemy/sql/expression.py +++ b/lib/sqlalchemy/sql/expression.py @@ -922,7 +922,7 @@ class ClauseElement(object): if bind.key in kwargs: bind.value = kwargs[bind.key] if unique: - bind.unique=True + bind._convert_to_unique() return Vis().traverse(self, clone=True) def compare(self, other): @@ -1749,10 +1749,14 @@ class _BindParamClause(ClauseElement, _CompareMixin): if True, the parameter should be treated like a stored procedure "OUT" parameter. """ - - self.key = key or "{ANON %d param}" % id(self) - self.value = value + + if unique: + self.key = "{ANON %d %s}" % (id(self), key or 'param') + else: + self.key = key or "{ANON %d param}" % id(self) + self._orig_key = key self.unique = unique + self.value = value self.isoutparam = isoutparam self.shortname = shortname @@ -1778,6 +1782,17 @@ class _BindParamClause(ClauseElement, _CompareMixin): type(None):sqltypes.NullType } + def _clone(self): + c = ClauseElement._clone(self) + if self.unique: + c.key = "{ANON %d %s}" % (id(c), c._orig_key or 'param') + return c + + def _convert_to_unique(self): + if not self.unique: + self.unique=True + self.key = "{ANON %d %s}" % (id(self), self._orig_key or 'param') + def _get_from_objects(self, **modifiers): return [] diff --git a/test/dialect/firebird.py b/test/dialect/firebird.py index 0abbdf029c..636ab2d058 100644 --- a/test/dialect/firebird.py +++ b/test/dialect/firebird.py @@ -21,7 +21,7 @@ class CompileTest(SQLCompileTest): self.assert_compile(s, "SELECT sometable_1.col1, sometable_1.col2 FROM sometable sometable_1") def test_function(self): - self.assert_compile(func.foo(1, 2), "foo(:foo, :foo_1)") + self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)") self.assert_compile(func.current_time(), "CURRENT_TIME") self.assert_compile(func.foo(), "foo") diff --git a/test/dialect/mssql.py b/test/dialect/mssql.py index 3b40ed354f..1a85047016 100755 --- a/test/dialect/mssql.py +++ b/test/dialect/mssql.py @@ -16,7 +16,7 @@ class CompileTest(SQLCompileTest): def test_update(self): t = table('sometable', column('somecolumn')) - self.assert_compile(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :sometable_somecolumn", dict(somecolumn=10)) + self.assert_compile(t.update(t.c.somecolumn==7), "UPDATE sometable SET somecolumn=:somecolumn WHERE sometable.somecolumn = :sometable_somecolumn_1", dict(somecolumn=10)) def test_count(self): t = table('sometable', column('somecolumn')) @@ -40,12 +40,12 @@ class CompileTest(SQLCompileTest): select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_(["t2col2r2", "t2col2r3"])) ) u = union(s1, s2, order_by=['col3', 'col4']) - self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2, :t1_col2_1) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2, :t2_col2_1) ORDER BY col3, col4") + self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2_1, :t1_col2_2) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2_3, :t2_col2_4) ORDER BY col3, col4") - self.assert_compile(u.alias('bar').select(), "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2, :t1_col2_1) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2, :t2_col2_1)) AS bar") + self.assert_compile(u.alias('bar').select(), "SELECT bar.col3, bar.col4 FROM (SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2_1, :t1_col2_2) UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2_3, :t2_col2_4)) AS bar") def test_function(self): - self.assert_compile(func.foo(1, 2), "foo(:foo, :foo_1)") + self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)") self.assert_compile(func.current_time(), "CURRENT_TIME") self.assert_compile(func.foo(), "foo()") diff --git a/test/dialect/oracle.py b/test/dialect/oracle.py index fa65837f0e..79d73fb899 100644 --- a/test/dialect/oracle.py +++ b/test/dialect/oracle.py @@ -95,8 +95,8 @@ class CompileTest(SQLCompileTest): self.assert_compile(query, "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \ -(mytable.name = :mytable_name OR mytable.myid = :mytable_myid OR \ -myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo where boo = lar))", +(mytable.name = :mytable_name_1 OR mytable.myid = :mytable_myid_2 OR \ +myothertable.othername != :myothertable_othername_3 OR EXISTS (select yay from foo where boo = lar))", dialect=oracle.OracleDialect(use_ansi = False)) query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid) @@ -125,7 +125,7 @@ myothertable.othername != :myothertable_othername OR EXISTS (select yay from foo order_by(addresses.oid_column, address_types.oid_column) self.assert_compile(s, "SELECT address_types_1.id, address_types_1.name, addresses.id, addresses.user_id, " "addresses.address_type_id, addresses.email_address FROM addresses LEFT OUTER JOIN address_types address_types_1 " - "ON addresses.address_type_id = address_types_1.id WHERE addresses.user_id = :addresses_user_id ORDER BY addresses.rowid, " + "ON addresses.address_type_id = address_types_1.id WHERE addresses.user_id = :addresses_user_id_2 ORDER BY addresses.rowid, " "address_types.rowid") class TypesTest(SQLCompileTest): diff --git a/test/orm/eager_relations.py b/test/orm/eager_relations.py index acb5e0da73..6b2baad95d 100644 --- a/test/orm/eager_relations.py +++ b/test/orm/eager_relations.py @@ -700,11 +700,11 @@ class SelfReferentialEagerTest(ORMTest): if testing.against('sqlite'): self.assert_sql(testbase.db, go, [ ( - "SELECT nodes.id AS nodes_id, nodes.parent_id AS nodes_parent_id, nodes.data AS nodes_data FROM nodes WHERE nodes.data = :nodes_data ORDER BY nodes.oid LIMIT 1 OFFSET 0", - {'nodes_data': 'n1'} + "SELECT nodes.id AS nodes_id, nodes.parent_id AS nodes_parent_id, nodes.data AS nodes_data FROM nodes WHERE nodes.data = :nodes_data_1 ORDER BY nodes.oid LIMIT 1 OFFSET 0", + {'nodes_data_1': 'n1'} ), ]) - + @testing.fails_on('maxdb') def test_no_depth(self): class Node(Base): diff --git a/test/orm/query.py b/test/orm/query.py index 4e5f048558..d4a69d249d 100644 --- a/test/orm/query.py +++ b/test/orm/query.py @@ -208,9 +208,9 @@ class OperatorTest(QueryTest): (operator.sub, '-'), (operator.div, '/'), ): for (lhs, rhs, res) in ( - (5, User.id, ':users_id %s users.id'), + (5, User.id, ':users_id_1 %s users.id'), (5, literal(6), ':param_1 %s :param_2'), - (User.id, 5, 'users.id %s :users_id'), + (User.id, 5, 'users.id %s :users_id_1'), (User.id, literal('b'), 'users.id %s :param_1'), (User.id, User.id, 'users.id %s users.id'), (literal(5), 'b', ':param_1 %s :param_2'), @@ -228,9 +228,9 @@ class OperatorTest(QueryTest): (operator.le, '<=', '>='), (operator.ge, '>=', '<=')): for (lhs, rhs, l_sql, r_sql) in ( - ('a', User.id, ':users_id', 'users.id'), + ('a', User.id, ':users_id_1', 'users.id'), ('a', literal('b'), ':param_2', ':param_1'), # note swap! - (User.id, 'b', 'users.id', ':users_id'), + (User.id, 'b', 'users.id', ':users_id_1'), (User.id, literal('b'), 'users.id', ':param_1'), (User.id, User.id, 'users.id', 'users.id'), (literal('a'), 'b', ':param_1', ':param_2'), @@ -249,15 +249,15 @@ class OperatorTest(QueryTest): fwd_sql + "'\n or\n'" + rev_sql + "'") def test_op(self): - assert str(User.name.op('ilike')('17').compile(dialect=default.DefaultDialect())) == "users.name ilike :users_name" + assert str(User.name.op('ilike')('17').compile(dialect=default.DefaultDialect())) == "users.name ilike :users_name_1" def test_in(self): self._test(User.id.in_(['a', 'b']), - "users.id IN (:users_id, :users_id_1)") + "users.id IN (:users_id_1, :users_id_2)") def test_between(self): self._test(User.id.between('a', 'b'), - "users.id BETWEEN :users_id AND :users_id_1") + "users.id BETWEEN :users_id_1 AND :users_id_2") def test_clauses(self): for (expr, compare) in ( diff --git a/test/sql/generative.py b/test/sql/generative.py index 35a1cc2b18..b404b00179 100644 --- a/test/sql/generative.py +++ b/test/sql/generative.py @@ -218,13 +218,33 @@ class ClauseTest(SQLCompileTest): def visit_binary(self, binary): if binary.left is t1.c.col3: binary.left = t1.c.col1 - binary.right = bindparam("table1_col1") + binary.right = bindparam("table1_col1", unique=True) s5 = Vis().traverse(s4, clone=True) print str(s4) print str(s5) assert str(s5) == s5_assert assert str(s4) == s4_assert - + + def test_binds(self): + """test that unique bindparams change their name upon clone() to prevent conflicts""" + + s = select([t1], t1.c.col1==bindparam(None, unique=True)).alias() + s2 = ClauseVisitor().traverse(s, clone=True).alias() + s3 = select([s], s.c.col2==s2.c.col2) + + self.assert_compile(s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, "\ + "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_1, "\ + "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_3) AS anon_4 "\ + "WHERE anon_1.col2 = anon_4.col2") + + s = select([t1], t1.c.col1==4).alias() + s2 = ClauseVisitor().traverse(s, clone=True).alias() + s3 = select([s], s.c.col2==s2.c.col2) + self.assert_compile(s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, "\ + "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_2) AS anon_1, "\ + "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :table1_col1_3) AS anon_4 "\ + "WHERE anon_1.col2 = anon_4.col2") + def test_alias(self): subq = t2.select().alias('subq') s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)]) @@ -247,7 +267,7 @@ class ClauseTest(SQLCompileTest): def visit_select(self, select): select.append_whereclause(t1.c.col2==7) - self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2") + self.assert_compile(Vis().traverse(s, clone=True), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :table1_col2_1") def test_clause_adapter(self): @@ -358,13 +378,18 @@ class SelectTest(SQLCompileTest): ) def test_select(self): - self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1 ORDER BY table1.col3") + self.assert_compile(t1.select().where(t1.c.col1==5).order_by(t1.c.col3), + "SELECT table1.col1, table1.col2, table1.col3 FROM table1 WHERE table1.col1 = :table1_col1_1 ORDER BY table1.col3") - self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3") + self.assert_compile(t1.select().select_from(select([t2], t2.c.col1==t1.c.col1)).order_by(t1.c.col3), + "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\ + "FROM table2 WHERE table2.col1 = table1.col1) ORDER BY table1.col3") s = select([t2], t2.c.col1==t1.c.col1, correlate=False) s = s.correlate(t1).order_by(t2.c.col3) - self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3), "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3") + self.assert_compile(t1.select().select_from(s).order_by(t1.c.col3), + "SELECT table1.col1, table1.col2, table1.col3 FROM table1, (SELECT table2.col1 AS col1, table2.col2 AS col2, table2.col3 AS col3 "\ + "FROM table2 WHERE table2.col1 = table1.col1 ORDER BY table2.col3) ORDER BY table1.col3") def test_columns(self): s = t1.select() diff --git a/test/sql/query.py b/test/sql/query.py index 9b35cff1c8..6233de7436 100644 --- a/test/sql/query.py +++ b/test/sql/query.py @@ -174,6 +174,25 @@ class QueryTest(PersistTest): r = s.execute(userid='fred').fetchall() assert len(r) == 1 + def test_unique_conflict(self): + u = bindparam('userid', unique=True) + s = users.select(or_(users.c.user_name==u, users.c.user_name==u)) + try: + str(s) + assert False + except exceptions.CompileError, e: + assert str(e) == "Bind parameter '{ANON %d userid}' conflicts with unique bind parameter of the same name" % id(u) + + def test_bindparams_in_params(self): + """test that a _BindParamClause itself can be a key in the params dict""" + + users.insert().execute(user_id = 7, user_name = 'jack') + users.insert().execute(user_id = 8, user_name = 'fred') + + u = bindparam('userid') + r = users.select(users.c.user_name==u).execute({u:'fred'}).fetchall() + assert len(r) == 1 + def test_bindparam_shortname(self): """test the 'shortname' field on BindParamClause.""" users.insert().execute(user_id = 7, user_name = 'jack') diff --git a/test/sql/select.py b/test/sql/select.py index 58c4ea3ddf..d36703af43 100644 --- a/test/sql/select.py +++ b/test/sql/select.py @@ -84,7 +84,8 @@ myothertable.othername FROM mytable, myothertable") s.c.myid == 7 ) , - "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable WHERE mytable.name = :mytable_name) WHERE myid = :myid") + "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable "\ + "WHERE mytable.name = :mytable_name_1) WHERE myid = :myid_2") sq = select([table1]) self.assert_compile( @@ -99,7 +100,7 @@ myothertable.othername FROM mytable, myothertable") self.assert_compile( sq.select(sq.c.myid == 7), "SELECT sq.myid, sq.name, sq.description FROM \ -(SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :sq_myid" +(SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :sq_myid_1" ) sq = select( @@ -111,7 +112,7 @@ myothertable.othername FROM mytable, myothertable") sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, \ mytable.description AS mytable_description, myothertable.otherid AS myothertable_otherid, \ myothertable.othername AS myothertable_othername FROM mytable, myothertable \ -WHERE mytable.myid = :mytable_myid AND myothertable.otherid = mytable.myid" +WHERE mytable.myid = :mytable_myid_1 AND myothertable.otherid = mytable.myid" self.assert_compile(sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, sq.mytable_description, sq.myothertable_otherid, \ sq.myothertable_othername FROM (" + sqstring + ") AS sq") @@ -148,7 +149,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile(select([table1], from_obj=[table1, table1.select()]), """SELECT mytable.myid, mytable.name, mytable.description FROM mytable, (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description AS description FROM mytable)""") def testexistsascolumnclause(self): - self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid)", params={'mytable_myid':5}) + self.assert_compile(exists([table1.c.myid], table1.c.myid==5).select(), "SELECT EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid_1)", params={'mytable_myid':5}) self.assert_compile(select([table1, exists([1], from_obj=table2)]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) FROM mytable", params={}) @@ -183,7 +184,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile( table1.select(table1.c.myid == select([table1.c.myid], table1.c.name=='jack')), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :mytable_name)" + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.name = :mytable_name_1)" ) self.assert_compile( @@ -279,8 +280,8 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ) self.assert_compile(q,"SELECT places.id, places.nm, zips.zipcode, latlondist((SELECT zips.latitude FROM zips WHERE " - "zips.zipcode = :zips_zipcode), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_1)) AS dist " - "FROM places, zips WHERE zips.zipcode = :zips_zipcode_2 ORDER BY dist, places.nm") + "zips.zipcode = :zips_zipcode_1), (SELECT zips.longitude FROM zips WHERE zips.zipcode = :zips_zipcode_2)) AS dist " + "FROM places, zips WHERE zips.zipcode = :zips_zipcode_3 ORDER BY dist, places.nm") zalias = zips.alias('main_zip') qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode, scalar=True) @@ -303,7 +304,8 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A def testand(self): self.assert_compile( select(['*'], and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()")), - "SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND mytable.name = :mytable_name AND myothertable.othername = :myothertable_othername AND sysdate() = today()" + "SELECT * FROM mytable, myothertable WHERE mytable.myid = :mytable_myid_1 AND mytable.name = :mytable_name_2 "\ + "AND myothertable.othername = :myothertable_othername_3 AND sysdate() = today()" ) def testor(self): @@ -313,8 +315,8 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A or_(table2.c.othername=='asdf', table2.c.othername == 'foo', table2.c.otherid == 9), "sysdate() = today()", )), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :mytable_myid AND (myothertable.othername = :myothertable_othername OR myothertable.othername = :myothertable_othername_1 OR myothertable.otherid = :myothertable_otherid) AND sysdate() = today()", - checkparams = {'myothertable_othername': 'asdf', 'myothertable_othername_1':'foo', 'myothertable_otherid': 9, 'mytable_myid': 12} + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :mytable_myid_1 AND (myothertable.othername = :myothertable_othername_2 OR myothertable.othername = :myothertable_othername_3 OR myothertable.otherid = :myothertable_otherid_4) AND sysdate() = today()", + checkparams = {'myothertable_othername_2': 'asdf', 'myothertable_othername_3':'foo', 'myothertable_otherid_4': 9, 'mytable_myid_1': 12} ) def testdistinct(self): @@ -344,9 +346,9 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A (operator.sub, '-'), (operator.div, '/'), ): for (lhs, rhs, res) in ( - (5, table1.c.myid, ':mytable_myid %s mytable.myid'), + (5, table1.c.myid, ':mytable_myid_1 %s mytable.myid'), (5, literal(5), ':param_1 %s :param_2'), - (table1.c.myid, 'b', 'mytable.myid %s :mytable_myid'), + (table1.c.myid, 'b', 'mytable.myid %s :mytable_myid_1'), (table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'), (table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'), (literal(5), 8, ':param_1 %s :param_2'), @@ -363,9 +365,9 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A (operator.le, '<=', '>='), (operator.ge, '>=', '<=')): for (lhs, rhs, l_sql, r_sql) in ( - ('a', table1.c.myid, ':mytable_myid', 'mytable.myid'), + ('a', table1.c.myid, ':mytable_myid_1', 'mytable.myid'), ('a', literal('b'), ':param_2', ':param_1'), # note swap! - (table1.c.myid, 'b', 'mytable.myid', ':mytable_myid'), + (table1.c.myid, 'b', 'mytable.myid', ':mytable_myid_1'), (table1.c.myid, literal('b'), 'mytable.myid', ':param_1'), (table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'), (literal('a'), 'b', ':param_1', ':param_2'), @@ -385,22 +387,24 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A self.assert_compile( table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND mytable.name != :mytable_name" + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND mytable.name != :mytable_name_2" ) self.assert_compile( table1.select((table1.c.myid != 12) & ~(table1.c.name.between('jack','john'))), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name BETWEEN :mytable_name AND :mytable_name_1)" + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND "\ + "NOT (mytable.name BETWEEN :mytable_name_2 AND :mytable_name_3)" ) self.assert_compile( table1.select((table1.c.myid != 12) & ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT (mytable.name = :mytable_name AND mytable.name = :mytable_name_1 AND mytable.name = :mytable_name_2)" + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND "\ + "NOT (mytable.name = :mytable_name_2 AND mytable.name = :mytable_name_3 AND mytable.name = :mytable_name_4)" ) self.assert_compile( table1.select((table1.c.myid != 12) & ~table1.c.name), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid AND NOT mytable.name" + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid != :mytable_myid_1 AND NOT mytable.name" ) self.assert_compile( @@ -410,7 +414,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A # test the op() function, also that its results are further usable in expressions self.assert_compile( table1.select(table1.c.myid.op('hoho')(12)==14), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid) = :param_1" + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (mytable.myid hoho :mytable_myid_1) = :param_2" ) # test that clauses can be pickled (operators need to be module-level, etc.) @@ -420,27 +424,27 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A def testextracomparisonoperators(self): self.assert_compile( table1.select(table1.c.name.contains('jo')), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name", - checkparams = {'mytable_name': u'%jo%'}, + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name_1", + checkparams = {'mytable_name_1': u'%jo%'}, ) self.assert_compile( table1.select(table1.c.name.endswith('hn')), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name", - checkparams = {'mytable_name': u'%hn'}, + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name_1", + checkparams = {'mytable_name_1': u'%hn'}, ) def testunicodestartswith(self): string = u"hi \xf6 \xf5" self.assert_compile( table1.select(table1.c.name.startswith(string)), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name", - checkparams = {'mytable_name': u'hi \xf6 \xf5%'}, + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.name LIKE :mytable_name_1", + checkparams = {'mytable_name_1': u'hi \xf6 \xf5%'}, ) def testmultiparam(self): self.assert_compile( select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf', table1.c.myid == 'foo')), - "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid OR mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2" + "SELECT * FROM mytable WHERE mytable.myid = :mytable_myid_1 OR mytable.myid = :mytable_myid_2 OR mytable.myid = :mytable_myid_3" ) def testorderby(self): @@ -468,17 +472,17 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A ) def testforupdate(self): - self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE") + self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE") - self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE") + self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE") - self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE NOWAIT", dialect=oracle.dialect()) + self.assert_compile(table1.select(table1.c.myid==7, for_update="nowait"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE NOWAIT", dialect=oracle.dialect()) self.assert_compile(table1.select(table1.c.myid==7, for_update="read"), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", dialect=mysql.dialect()) self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = %s FOR UPDATE", dialect=mysql.dialect()) - self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid FOR UPDATE", dialect=oracle.dialect()) + self.assert_compile(table1.select(table1.c.myid==7, for_update=True), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 FOR UPDATE", dialect=oracle.dialect()) def testalias(self): # test the alias for a table1. column names stay the same, table name "changes" to "foo". @@ -512,7 +516,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A t2view.mytable_description AS t2view_mytable_description, t2view.myothertable_otherid AS t2view_myothertable_otherid FROM \ (SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, mytable.description AS mytable_description, \ myothertable.otherid AS myothertable_otherid FROM mytable, myothertable \ -WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid" +WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid_1" ) @@ -670,7 +674,7 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today """tests the generation of functions using the func keyword""" # test an expression with a function self.assert_compile(func.lala(3, 4, literal("five"), table1.c.myid) * table2.c.otherid, - "lala(:lala, :lala_1, :param_1, mytable.myid) * myothertable.otherid") + "lala(:lala_1, :lala_2, :param_3, mytable.myid) * myothertable.otherid") # test it in a SELECT self.assert_compile(select([func.count(table1.c.myid)]), @@ -683,16 +687,16 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today # test the bind parameter name with a "dotted" function name is only the name # (limits the length of the bind param name) self.assert_compile(select([func.foo.bar.lala(12)]), - "SELECT foo.bar.lala(:lala)") + "SELECT foo.bar.lala(:lala_1)") # test a dotted func off the engine itself - self.assert_compile(func.lala.hoho(7), "lala.hoho(:hoho)") + self.assert_compile(func.lala.hoho(7), "lala.hoho(:hoho_1)") # test None becomes NULL - self.assert_compile(func.my_func(1,2,None,3), "my_func(:my_func, :my_func_1, NULL, :my_func_2)") + self.assert_compile(func.my_func(1,2,None,3), "my_func(:my_func_1, :my_func_2, NULL, :my_func_3)") # test pickling - self.assert_compile(util.pickle.loads(util.pickle.dumps(func.my_func(1, 2, None, 3))), "my_func(:my_func, :my_func_1, NULL, :my_func_2)") + self.assert_compile(util.pickle.loads(util.pickle.dumps(func.my_func(1, 2, None, 3))), "my_func(:my_func_1, :my_func_2, NULL, :my_func_3)") # assert func raises AttributeError for __bases__ attribute, since its not a class # fixes pydoc @@ -723,16 +727,16 @@ FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND datetime(foo) = Today self.assert_compile(s, "SELECT users.id, users.name, users.fullname " "FROM users, (SELECT q, z, r " - "FROM calculate(:x, :y)) AS c1, (SELECT q, z, r " - "FROM calculate(:x_1, :y_1)) AS c2 " + "FROM calculate(:x_1, :y_2)) AS c1, (SELECT q, z, r " + "FROM calculate(:x_3, :y_4)) AS c2 " "WHERE users.id BETWEEN c1.z AND c2.z" - , checkparams={'y': 45, 'x': 17, 'y_1': 12, 'x_1': 5}) + , checkparams={'y_2': 45, 'x_1': 17, 'y_4': 12, 'x_3': 5}) def testextract(self): """test the EXTRACT function""" self.assert_compile(select([extract("month", table3.c.otherstuff)]), "SELECT extract(month FROM thirdtable.otherstuff) FROM thirdtable") - self.assert_compile(select([extract("day", func.to_date("03/20/2005", "MM/DD/YYYY"))]), "SELECT extract(day FROM to_date(:to_date, :to_date_1))") + self.assert_compile(select([extract("day", func.to_date("03/20/2005", "MM/DD/YYYY"))]), "SELECT extract(day FROM to_date(:to_date_1, :to_date_2))") def testjoin(self): self.assert_compile( @@ -785,9 +789,9 @@ mytable.description FROM myothertable JOIN mytable ON mytable.myid = myothertabl ) self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description \ -FROM mytable WHERE mytable.myid = :mytable_myid UNION \ +FROM mytable WHERE mytable.myid = :mytable_myid_1 UNION \ SELECT mytable.myid, mytable.name, mytable.description \ -FROM mytable WHERE mytable.myid = :mytable_myid_1 ORDER BY mytable.myid") +FROM mytable WHERE mytable.myid = :mytable_myid_2 ORDER BY mytable.myid") self.assert_compile( union( @@ -828,8 +832,8 @@ FROM myothertable ORDER BY myid \ ) , "SELECT mytable.myid, mytable.name, max(mytable.description) FROM mytable \ -WHERE mytable.name = :mytable_name GROUP BY mytable.myid, mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description \ -FROM mytable WHERE mytable.name = :mytable_name_1" +WHERE mytable.name = :mytable_name_1 GROUP BY mytable.myid, mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description \ +FROM mytable WHERE mytable.name = :mytable_name_2" ) def test_compound_select_grouping(self): @@ -873,8 +877,8 @@ UNION SELECT mytable.myid FROM mytable" self.assert_compile(query, "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \ FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid \ -WHERE mytable.name = %(mytable_name)s OR mytable.myid = %(mytable_myid)s OR \ -myothertable.othername != %(myothertable_othername)s OR \ +WHERE mytable.name = %(mytable_name_1)s OR mytable.myid = %(mytable_myid_2)s OR \ +myothertable.othername != %(myothertable_othername_3)s OR \ EXISTS (select yay from foo where boo = lar)", dialect=postgres.dialect() ) @@ -918,10 +922,10 @@ EXISTS (select yay from foo where boo = lar)", ), ( select([table1], or_(table1.c.myid==bindparam('myid', unique=True), table2.c.otherid==bindparam('myid', unique=True))), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myid_1", + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid_1 OR myothertable.otherid = :myid_2", "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?", - {'myid':None, 'myid_1':None}, [None, None], - {'myid':5, 'myid_1': 6}, {'myid':5, 'myid_1':6}, [5,6] + {'myid_1':None, 'myid_2':None}, [None, None], + {'myid_1':5, 'myid_2': 6}, {'myid_1':5, 'myid_2':6}, [5,6] ), ( bindparam('test', type_=String) + text("'hi'"), @@ -939,10 +943,10 @@ EXISTS (select yay from foo where boo = lar)", ), ( select([table1], or_(table1.c.myid==bindparam('myid', value=7, unique=True), table2.c.otherid==bindparam('myid', value=8, unique=True))), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid OR myothertable.otherid = :myid_1", + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = :myid_1 OR myothertable.otherid = :myid_2", "SELECT mytable.myid, mytable.name, mytable.description FROM mytable, myothertable WHERE mytable.myid = ? OR myothertable.otherid = ?", - {'myid':7, 'myid_1':8}, [7,8], - {'myid':5, 'myid_1':6}, {'myid':5, 'myid_1':6}, [5,6] + {'myid_1':7, 'myid_2':8}, [7,8], + {'myid_1':5, 'myid_2':6}, {'myid_1':5, 'myid_2':6}, [5,6] ), ]: @@ -952,7 +956,7 @@ EXISTS (select yay from foo where boo = lar)", positional = stmt.compile(dialect=sqlite.dialect()) pp = positional.get_params() assert [pp[k] for k in positional.positiontup] == expected_default_params_list - assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict).get_raw_dict())) + assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict))) pp = positional.get_params(**test_param_dict) assert [pp[k] for k in positional.positiontup] == expected_test_params_list @@ -966,12 +970,12 @@ EXISTS (select yay from foo where boo = lar)", # check that conflicts with "unique" params are caught - s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('mytable_myid'))) + s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('mytable_myid_1'))) try: - str(s) + print str(s) assert False except exceptions.CompileError, err: - assert str(err) == "Bind parameter 'mytable_myid' conflicts with unique bind parameter of the same name" + assert str(err) == "Bind parameter 'mytable_myid_1' conflicts with unique bind parameter of the same name" s = select([table1], or_(table1.c.myid==7, table1.c.myid==8, table1.c.myid==bindparam('mytable_myid_1'))) try: @@ -990,40 +994,40 @@ EXISTS (select yay from foo where boo = lar)", def testin(self): self.assert_compile(select([table1], table1.c.myid.in_(['a'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1)") self.assert_compile(select([table1], ~table1.c.myid.in_(['a'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid NOT IN (:mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid NOT IN (:mytable_myid_1)") self.assert_compile(select([table1], table1.c.myid.in_(['a', 'b'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2)") self.assert_compile(select([table1], table1.c.myid.in_(iter(['a', 'b']))), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a')])), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), 'b'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid_2)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), literal('b')])), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :param_2)") self.assert_compile(select([table1], table1.c.myid.in_(['a', literal('b')])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :param_1)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :param_2)") self.assert_compile(select([table1], table1.c.myid.in_([literal(1) + 'a'])), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 + :param_2)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a') +'a', 'b'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :mytable_myid_3)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a') + literal('a'), literal('b')])), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 || :param_2, :param_3)") self.assert_compile(select([table1], table1.c.myid.in_([1, literal(3) + 4])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :param_1 + :param_2)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :param_2 + :param_3)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a') < 'b'])), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1 < :param_2)") @@ -1032,19 +1036,19 @@ EXISTS (select yay from foo where boo = lar)", "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (mytable.myid)") self.assert_compile(select([table1], table1.c.myid.in_(['a', table1.c.myid])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, mytable.myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, mytable.myid)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid])), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid)") self.assert_compile(select([table1], table1.c.myid.in_([literal('a'), table1.c.myid +'a'])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid + :mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, mytable.myid + :mytable_myid_2)") self.assert_compile(select([table1], table1.c.myid.in_([literal(1), 'a' + table1.c.myid])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid + mytable.myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:param_1, :mytable_myid_2 + mytable.myid)") self.assert_compile(select([table1], table1.c.myid.in_([1, 2, 3])), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1, :mytable_myid_2)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2, :mytable_myid_3)") self.assert_compile(select([table1], table1.c.myid.in_(select([table2.c.otherid]))), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (SELECT myothertable.otherid FROM myothertable)") @@ -1059,8 +1063,8 @@ EXISTS (select yay from foo where boo = lar)", ) )), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable \ WHERE mytable.myid IN (\ -SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid \ -UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1)") +SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_1 \ +UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid = :mytable_myid_2)") # test that putting a select in an IN clause does not blow away its ORDER BY clause self.assert_compile( @@ -1079,13 +1083,13 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE def test_in_deprecated_api(self): self.assert_compile(select([table1], table1.c.myid.in_('abc')), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1)") self.assert_compile(select([table1], table1.c.myid.in_(1)), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1)") self.assert_compile(select([table1], table1.c.myid.in_(1,2)), - "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid, :mytable_myid_1)") + "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:mytable_myid_1, :mytable_myid_2)") self.assert_compile(select([table1], table1.c.myid.in_()), "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)") @@ -1130,31 +1134,33 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE import datetime table = Table('dt', metadata, Column('date', Date)) - self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date AND :dt_date_1", checkparams={'dt_date':datetime.date(2006,6,1), 'dt_date_1':datetime.date(2006,6,5)}) + self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))), + "SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date_1 AND :dt_date_2", checkparams={'dt_date_1':datetime.date(2006,6,1), 'dt_date_2':datetime.date(2006,6,5)}) - self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)}) + self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))), + "SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)}) def test_operator_precedence(self): table = Table('op', metadata, Column('field', Integer)) self.assert_compile(table.select((table.c.field == 5) == None), - "SELECT op.field FROM op WHERE (op.field = :op_field) IS NULL") + "SELECT op.field FROM op WHERE (op.field = :op_field_1) IS NULL") self.assert_compile(table.select((table.c.field + 5) == table.c.field), - "SELECT op.field FROM op WHERE op.field + :op_field = op.field") + "SELECT op.field FROM op WHERE op.field + :op_field_1 = op.field") self.assert_compile(table.select((table.c.field + 5) * 6), - "SELECT op.field FROM op WHERE (op.field + :op_field) * :param_1") + "SELECT op.field FROM op WHERE (op.field + :op_field_1) * :param_2") self.assert_compile(table.select((table.c.field * 5) + 6), - "SELECT op.field FROM op WHERE op.field * :op_field + :param_1") + "SELECT op.field FROM op WHERE op.field * :op_field_1 + :param_2") self.assert_compile(table.select(5 + table.c.field.in_([5,6])), - "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:op_field, :op_field_1))") + "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:op_field_2, :op_field_3))") self.assert_compile(table.select((5 + table.c.field).in_([5,6])), - "SELECT op.field FROM op WHERE :op_field + op.field IN (:param_1, :param_2)") + "SELECT op.field FROM op WHERE :op_field_1 + op.field IN (:param_2, :param_3)") self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))), - "SELECT op.field FROM op WHERE NOT (op.field = :op_field AND op.field = :op_field_1)") + "SELECT op.field FROM op WHERE NOT (op.field = :op_field_1 AND op.field = :op_field_2)") self.assert_compile(table.select(not_(table.c.field == 5)), - "SELECT op.field FROM op WHERE op.field != :op_field") + "SELECT op.field FROM op WHERE op.field != :op_field_1") self.assert_compile(table.select(not_(table.c.field.between(5, 6))), - "SELECT op.field FROM op WHERE NOT (op.field BETWEEN :op_field AND :op_field_1)") + "SELECT op.field FROM op WHERE NOT (op.field BETWEEN :op_field_1 AND :op_field_2)") self.assert_compile(table.select(not_(table.c.field) == 5), "SELECT op.field FROM op WHERE (NOT op.field) = :param_1") self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)), @@ -1204,16 +1210,16 @@ class CRUDTest(SQLCompileTest): self.assert_compile(insert(table1, values=dict(myid=func.lala())), "INSERT INTO mytable (myid) VALUES (lala())") def testupdate(self): - self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {table1.c.name:'fred'}) - self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid", params = {'name':'fred'}) + self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid_1", params = {table1.c.name:'fred'}) + self.assert_compile(update(table1, table1.c.myid == 7), "UPDATE mytable SET name=:name WHERE mytable.myid = :mytable_myid_1", params = {'name':'fred'}) self.assert_compile(update(table1, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid") self.assert_compile(update(table1, whereclause = table1.c.name == bindparam('crit'), values = {table1.c.name : 'hi'}), "UPDATE mytable SET name=:name WHERE mytable.name = :crit", params = {'crit' : 'notthere'}, checkparams={'crit':'notthere', 'name':'hi'}) - self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'}, checkparams={'description':'test', 'mytable_myid':12}) - self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :mytable_myid", params = {'mytable_myid': 12, 'myid': 9, 'description': 'test'}) - self.assert_compile(update(table1, table1.c.myid ==12), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :mytable_myid", params={'myid':18}, checkparams={'myid':18, 'mytable_myid':12}) + self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}), "UPDATE mytable SET name=mytable.myid, description=:description WHERE mytable.myid = :mytable_myid_1", params = {'description':'test'}, checkparams={'description':'test', 'mytable_myid_1':12}) + self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.myid : 9}), "UPDATE mytable SET myid=:myid, description=:description WHERE mytable.myid = :mytable_myid_1", params = {'mytable_myid_1': 12, 'myid': 9, 'description': 'test'}) + self.assert_compile(update(table1, table1.c.myid ==12), "UPDATE mytable SET myid=:myid WHERE mytable.myid = :mytable_myid_1", params={'myid':18}, checkparams={'myid':18, 'mytable_myid_1':12}) s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'}) c = s.compile(column_keys=['mytable_id', 'name']) - self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :mytable_name), description=:description WHERE mytable.myid = :mytable_myid", params = {'description':'test'}) + self.assert_compile(update(table1, table1.c.myid == 12, values = {table1.c.name : table1.c.myid}).values({table1.c.name:table1.c.name + 'foo'}), "UPDATE mytable SET name=(mytable.name || :mytable_name_1), description=:description WHERE mytable.myid = :mytable_myid_2", params = {'description':'test'}) self.assert_(str(s) == str(c)) def testupdateexpression(self): @@ -1223,8 +1229,8 @@ class CRUDTest(SQLCompileTest): values = { table1.c.name : table1.c.name + "lala", table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho')) - }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :mytable_name) " - "WHERE mytable.myid = hoho(:hoho) AND mytable.name = :param_2 || mytable.name || :param_3") + }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), name=(mytable.name || :mytable_name_2) " + "WHERE mytable.myid = hoho(:hoho_3) AND mytable.name = :param_4 || mytable.name || :param_5") def testcorrelatedupdate(self): # test against a straight text subquery @@ -1238,26 +1244,29 @@ class CRUDTest(SQLCompileTest): # test against a regular constructed subquery s = select([table2], table2.c.otherid == table1.c.myid) u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s}) - self.assert_compile(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :mytable_name") + self.assert_compile(u, "UPDATE mytable SET name=(SELECT myothertable.otherid, myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid) WHERE mytable.name = :mytable_name_1") # test a non-correlated WHERE clause s = select([table2.c.othername], table2.c.otherid == 7) u = update(table1, table1.c.name==s) - self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)") + self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = "\ + "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid_1)") # test one that is actually correlated... s = select([table2.c.othername], table2.c.otherid == table1.c.myid) u = table1.update(table1.c.name==s) - self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") + self.assert_compile(u, "UPDATE mytable SET myid=:myid, name=:name, description=:description WHERE mytable.name = "\ + "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)") def testdelete(self): - self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid") + self.assert_compile(delete(table1, table1.c.myid == 7), "DELETE FROM mytable WHERE mytable.myid = :mytable_myid_1") def testcorrelateddelete(self): # test a non-correlated WHERE clause s = select([table2.c.othername], table2.c.otherid == 7) u = delete(table1, table1.c.name==s) - self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid)") + self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = "\ + "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :myothertable_otherid_1)") # test one that is actually correlated... s = select([table2.c.othername], table2.c.otherid == table1.c.myid) @@ -1275,7 +1284,7 @@ class InlineDefaultTest(SQLCompileTest): Column('col2', Integer, default=select([func.coalesce(func.max(foo.c.id))])), ) - self.assert_compile(t.insert(inline=True, values={}), "INSERT INTO test (col1, col2) VALUES (foo(:foo), (SELECT coalesce(max(foo.id)) FROM foo))") + self.assert_compile(t.insert(inline=True, values={}), "INSERT INTO test (col1, col2) VALUES (foo(:foo_1), (SELECT coalesce(max(foo.id)) FROM foo))") def test_update(self): m = MetaData() @@ -1288,27 +1297,34 @@ class InlineDefaultTest(SQLCompileTest): Column('col3', String(30)) ) - self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo), col2=(SELECT coalesce(max(foo.id)) FROM foo), col3=:col3") + self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo_1), col2=(SELECT coalesce(max(foo.id)) FROM foo), col3=:col3") class SchemaTest(SQLCompileTest): def testselect(self): # these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable") - self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value") + self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), + "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE "\ + "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_2") s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')) s.use_labels = True - self.assert_compile(s, "SELECT remotetable.rem_id AS remotetable_rem_id, remotetable.datatype_id AS remotetable_datatype_id, remotetable.value AS remotetable_value FROM remote_owner.remotetable WHERE remotetable.datatype_id = :remotetable_datatype_id AND remotetable.value = :remotetable_value") + self.assert_compile(s, "SELECT remotetable.rem_id AS remotetable_rem_id, remotetable.datatype_id AS remotetable_datatype_id, remotetable.value "\ + "AS remotetable_value FROM remote_owner.remotetable WHERE "\ + "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_2") def testalias(self): a = alias(table4, 'remtable') - self.assert_compile(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable WHERE remtable.datatype_id = :remtable_datatype_id") + self.assert_compile(a.select(a.c.datatype_id==7), "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM remote_owner.remotetable AS remtable "\ + "WHERE remtable.datatype_id = :remtable_datatype_id_1") def testupdate(self): - self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id WHERE remotetable.value = :remotetable_value") + self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "\ + "WHERE remotetable.value = :remotetable_value_1") def testinsert(self): - self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES (:rem_id, :datatype_id, :value)") + self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "\ + "(:rem_id, :datatype_id, :value)") if __name__ == "__main__": testbase.main() diff --git a/test/sql/selectable.py b/test/sql/selectable.py index a862b0bdfe..49a61bf2b9 100755 --- a/test/sql/selectable.py +++ b/test/sql/selectable.py @@ -264,7 +264,7 @@ class PrimaryKeyTest(AssertMixin): b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True)) j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5)) - assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x", str(j) + assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :b_x_1", str(j) assert list(j.primary_key) == [a.c.id, b.c.x] class DerivedTest(AssertMixin):