from sqlalchemy import util, sql
from sqlalchemy.engine import default, reflection
-from sqlalchemy.sql import compiler, visitors, expression
+from sqlalchemy.sql import compiler, visitors, expression, util as sql_util
from sqlalchemy.sql import operators as sql_operators
from sqlalchemy.sql.elements import quoted_name
from sqlalchemy import types as sqltypes, schema as sa_schema
limitselect._oracle_visit = True
limitselect._is_wrapper = True
+ # add expressions to accomodate FOR UPDATE OF
+ for_update = select._for_update_arg
+ if for_update is not None and for_update.of:
+ for_update = for_update._clone()
+ for_update._copy_internals()
+
+ for elem in for_update.of:
+ select.append_column(elem)
+
+ adapter = sql_util.ClauseAdapter(select)
+ for_update.of = [
+ adapter.traverse(elem)
+ for elem in for_update.of]
+
# If needed, add the limiting clause
if limit_clause is not None:
if not self.dialect.use_binds_for_limits:
# If needed, add the ora_rn, and wrap again with offset.
if offset_clause is None:
- limitselect._for_update_arg = select._for_update_arg
+ limitselect._for_update_arg = for_update
select = limitselect
else:
limitselect = limitselect.column(
offsetselect._oracle_visit = True
offsetselect._is_wrapper = True
+ if for_update is not None and for_update.of:
+ for elem in for_update.of:
+ if limitselect.corresponding_column(elem) is None:
+ limitselect.append_column(elem)
+
if not self.dialect.use_binds_for_limits:
offset_clause = sql.literal_column(
"%d" % select._offset)
offsetselect.append_whereclause(
sql.literal_column("ora_rn") > offset_clause)
- offsetselect._for_update_arg = select._for_update_arg
+ offsetselect._for_update_arg = for_update
select = offsetselect
return compiler.SQLCompiler.visit_select(self, select, **kwargs)
"mytable_1.myid, mytable_1.name"
)
+ def test_for_update_of_w_limit_adaption_col_present(self):
+ table1 = table('mytable', column('myid'), column('name'))
+
+ self.assert_compile(
+ select([table1.c.myid, table1.c.name]).
+ where(table1.c.myid == 7).
+ with_for_update(nowait=True, of=table1.c.name).
+ limit(10),
+ "SELECT myid, name FROM "
+ "(SELECT mytable.myid AS myid, mytable.name AS name "
+ "FROM mytable WHERE mytable.myid = :myid_1) "
+ "WHERE ROWNUM <= :param_1 FOR UPDATE OF name NOWAIT",
+ )
+
+ def test_for_update_of_w_limit_adaption_col_unpresent(self):
+ table1 = table('mytable', column('myid'), column('name'))
+
+ self.assert_compile(
+ select([table1.c.myid]).
+ where(table1.c.myid == 7).
+ with_for_update(nowait=True, of=table1.c.name).
+ limit(10),
+ "SELECT myid FROM "
+ "(SELECT mytable.myid AS myid, mytable.name AS name "
+ "FROM mytable WHERE mytable.myid = :myid_1) "
+ "WHERE ROWNUM <= :param_1 FOR UPDATE OF name NOWAIT",
+ )
+
+ def test_for_update_of_w_limit_offset_adaption_col_present(self):
+ table1 = table('mytable', column('myid'), column('name'))
+
+ self.assert_compile(
+ select([table1.c.myid, table1.c.name]).
+ where(table1.c.myid == 7).
+ with_for_update(nowait=True, of=table1.c.name).
+ limit(10).offset(50),
+ "SELECT myid, name FROM (SELECT myid, name, ROWNUM AS ora_rn "
+ "FROM (SELECT mytable.myid AS myid, mytable.name AS name "
+ "FROM mytable WHERE mytable.myid = :myid_1) "
+ "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 "
+ "FOR UPDATE OF name NOWAIT",
+ )
+
+ def test_for_update_of_w_limit_offset_adaption_col_unpresent(self):
+ table1 = table('mytable', column('myid'), column('name'))
+
+ self.assert_compile(
+ select([table1.c.myid]).
+ where(table1.c.myid == 7).
+ with_for_update(nowait=True, of=table1.c.name).
+ limit(10).offset(50),
+ "SELECT myid FROM (SELECT myid, ROWNUM AS ora_rn, name "
+ "FROM (SELECT mytable.myid AS myid, mytable.name AS name "
+ "FROM mytable WHERE mytable.myid = :myid_1) "
+ "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 "
+ "FOR UPDATE OF name NOWAIT",
+ )
+
+ def test_for_update_of_w_limit_offset_adaption_partial_col_unpresent(self):
+ table1 = table('mytable', column('myid'), column('foo'), column('bar'))
+
+ self.assert_compile(
+ select([table1.c.myid, table1.c.bar]).
+ where(table1.c.myid == 7).
+ with_for_update(nowait=True, of=[table1.c.foo, table1.c.bar]).
+ limit(10).offset(50),
+ "SELECT myid, bar FROM (SELECT myid, bar, ROWNUM AS ora_rn, "
+ "foo FROM (SELECT mytable.myid AS myid, mytable.bar AS bar, "
+ "mytable.foo AS foo FROM mytable WHERE mytable.myid = :myid_1) "
+ "WHERE ROWNUM <= :param_1 + :param_2) WHERE ora_rn > :param_2 "
+ "FOR UPDATE OF foo, bar NOWAIT"
+ )
+
def test_limit_preserves_typing_information(self):
class MyType(TypeDecorator):
impl = Integer