name = self._truncate_bindparam(bindparam)
if name in self.binds:
existing = self.binds[name]
- if existing is not bindparam and (existing.unique or bindparam.unique):
- raise exc.CompileError(
- "Bind parameter '%s' conflicts with unique bind parameter of the same name" % bindparam.key
- )
+ if existing is not bindparam:
+ if existing.unique or bindparam.unique:
+ raise exc.CompileError(
+ "Bind parameter '%s' conflicts with "
+ "unique bind parameter of the same name" % bindparam.key
+ )
+ elif getattr(existing, '_is_crud', False):
+ raise exc.CompileError(
+ "Bind parameter name '%s' is reserved "
+ "for the VALUES or SET clause of this insert/update statement."
+ % bindparam.key
+ )
+
self.binds[bindparam.key] = self.binds[name] = bindparam
return self.bindparam_string(name)
if not colparams and \
not self.dialect.supports_default_values and \
not self.dialect.supports_empty_insert:
- raise exc.CompileError(
- "The version of %s you are using does not support empty inserts." % self.dialect.name)
+ raise exc.CompileError("The version of %s you are using does "
+ "not support empty inserts." %
+ self.dialect.name)
preparer = self.preparer
supports_default_values = self.dialect.supports_default_values
def _create_crud_bind_param(self, col, value, required=False):
bindparam = sql.bindparam(col.key, value, type_=col.type, required=required)
+ bindparam._is_crud = True
+ if col.key in self.binds:
+ raise exc.CompileError(
+ "Bind parameter name '%s' is reserved "
+ "for the VALUES or SET clause of this insert/update statement."
+ % col.key
+ )
+
self.binds[col.key] = bindparam
return self.bindparam_string(self._truncate_bindparam(bindparam))
self.prefetch = []
self.returning = []
- # get the keys of explicitly constructed bindparam() objects
- # TODO: ouch
- bind_names = set(b.key for b in visitors.iterate(stmt, {})
- if b.__visit_name__ == 'bindparam')
-
- if stmt.parameters:
- bind_names.update(stmt.parameters)
-
# no parameters in the statement, no parameters in the
# compiled params - return binds for all columns
if self.column_keys is None and stmt.parameters is None:
return [
(c, self._create_crud_bind_param(c, None, required=True))
- for c in stmt.table.columns if c.key not in bind_names
+ for c in stmt.table.columns
]
required = object()
parameters = {}
else:
parameters = dict((sql._column_as_key(key), required)
- for key in self.column_keys if key not in bind_names)
+ for key in self.column_keys
+ if not stmt.parameters or key not in stmt.parameters)
if stmt.parameters is not None:
for k, v in stmt.parameters.iteritems():
self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = (SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = mytable.myid)")
def test_binds_that_match_columns(self):
- """test bind params named after column names replace the normal SET/VALUES generation."""
+ """test bind params named after column names
+ replace the normal SET/VALUES generation."""
t = table('foo', column('x'), column('y'))
u = t.update().where(t.c.x==bindparam('x'))
+
+ assert_raises(exc.CompileError, u.compile)
- self.assert_compile(u, "UPDATE foo SET y=:y WHERE foo.x = :x")
self.assert_compile(u, "UPDATE foo SET WHERE foo.x = :x", params={})
- self.assert_compile(u.values(x=7), "UPDATE foo SET x=:x WHERE foo.x = :x")
+
+ assert_raises(exc.CompileError, u.values(x=7).compile)
+
self.assert_compile(u.values(y=7), "UPDATE foo SET y=:y WHERE foo.x = :x")
- self.assert_compile(u.values(x=7), "UPDATE foo SET x=:x, y=:y WHERE foo.x = :x", params={'x':1, 'y':2})
- self.assert_compile(u, "UPDATE foo SET y=:y WHERE foo.x = :x", params={'x':1, 'y':2})
- self.assert_compile(u.values(x=3 + bindparam('x')), "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x")
- self.assert_compile(u.values(x=3 + bindparam('x')), "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x", params={'x':1})
- self.assert_compile(u.values(x=3 + bindparam('x')), "UPDATE foo SET x=(:param_1 + :x), y=:y WHERE foo.x = :x", params={'x':1, 'y':2})
+ assert_raises(exc.CompileError, u.values(x=7).compile, column_keys=['x', 'y'])
+ assert_raises(exc.CompileError, u.compile, column_keys=['x', 'y'])
+
+ self.assert_compile(u.values(x=3 + bindparam('x')),
+ "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x")
+
+ self.assert_compile(u.values(x=3 + bindparam('x')),
+ "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x",
+ params={'x':1})
+
+ self.assert_compile(u.values(x=3 + bindparam('x')),
+ "UPDATE foo SET x=(:param_1 + :x), y=:y WHERE foo.x = :x",
+ params={'x':1, 'y':2})
i = t.insert().values(x=3 + bindparam('x'))
self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x))")
- self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)", params={'x':1, 'y':2})
+ self.assert_compile(i,
+ "INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)",
+ params={'x':1, 'y':2})
+
+ i = t.insert().values(x=bindparam('y'))
+ self.assert_compile(i, "INSERT INTO foo (x) VALUES (:y)")
+ i = t.insert().values(x=bindparam('y'), y=5)
+ assert_raises(exc.CompileError, i.compile)
+
+ i = t.insert().values(x=3 + bindparam('y'), y=5)
+ assert_raises(exc.CompileError, i.compile)
+
i = t.insert().values(x=3 + bindparam('x2'))
self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))")
self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={})
- self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)", params={'x':1, 'y':2})
- self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)", params={'x2':1, 'y':2})
+ self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
+ params={'x':1, 'y':2})
+ self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
+ params={'x2':1, 'y':2})
class InlineDefaultTest(TestBase, AssertsCompiledSQL):
def test_insert(self):