Modified the :paramref:`.Select.with_for_update.of` parameter so that if a
join or other composed selectable is passed, the individual :class:`.Table`
objects will be filtered from it, allowing one to pass a join() object to
the parameter, as occurs normally when using joined table inheritance with
the ORM. Pull request courtesy Raymond Lu.
Fixes: #4550
Co-authored-by: Mike Bayer <mike_mp@zzzcomputing.com>
Closes: #4551
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/4551
Pull-request-sha:
452da77d154a4087d530456db1c9af207d65cef4
Change-Id: If4b7c231f7b71190d7245543959fb5c3351125a1
--- /dev/null
+.. change::
+ :tags: bug, postgresql
+ :tickets: 4550
+
+ Modified the :paramref:`.Select.with_for_update.of` parameter so that if a
+ join or other composed selectable is passed, the individual :class:`.Table`
+ objects will be filtered from it, allowing one to pass a join() object to
+ the parameter, as occurs normally when using joined table inheritance with
+ the ORM. Pull request courtesy Raymond Lu.
+
from ...sql import elements
from ...sql import expression
from ...sql import sqltypes
+from ...sql import util as sql_util
from ...types import BIGINT
from ...types import BOOLEAN
from ...types import CHAR
tmp = " FOR UPDATE"
if select._for_update_arg.of:
- tables = util.OrderedSet(
- c.table if isinstance(c, expression.ColumnClause) else c
- for c in select._for_update_arg.of
- )
+
+ tables = util.OrderedSet()
+ for c in select._for_update_arg.of:
+ tables.update(sql_util.surface_selectables_only(c))
+
tmp += " OF " + ", ".join(
self.process(table, ashint=True, use_schema=False, **kw)
for table in tables
from .elements import Null
from .elements import UnaryExpression
from .schema import Column
+from .selectable import Alias
from .selectable import FromClause
from .selectable import FromGrouping
from .selectable import Join
from .selectable import ScalarSelect
from .selectable import SelectBase
+from .selectable import TableClause
from .. import exc
from .. import util
stack.append(elem.element)
+def surface_selectables_only(clause):
+ stack = [clause]
+ while stack:
+ elem = stack.pop()
+ if isinstance(elem, (TableClause, Alias)):
+ yield elem
+ if isinstance(elem, Join):
+ stack.extend((elem.left, elem.right))
+ elif isinstance(elem, FromGrouping):
+ stack.append(elem.element)
+ elif isinstance(elem, ColumnClause):
+ stack.append(elem.table)
+
+
def surface_column_elements(clause, include_scalar_selects=True):
"""traverse and yield only outer-exposed column elements, such as would
be addressable in the WHERE clause of a SELECT if this element were
"WHERE mytable_1.myid = %(myid_1)s FOR UPDATE OF mytable_1",
)
+ table2 = table("table2", column("mytable_id"))
+ join = table2.join(table1, table2.c.mytable_id == table1.c.myid)
+ self.assert_compile(
+ join.select(table2.c.mytable_id == 7).with_for_update(of=[join]),
+ "SELECT table2.mytable_id, "
+ "mytable.myid, mytable.name, mytable.description "
+ "FROM table2 "
+ "JOIN mytable ON table2.mytable_id = mytable.myid "
+ "WHERE table2.mytable_id = %(mytable_id_1)s "
+ "FOR UPDATE OF mytable, table2",
+ )
+
+ join = table2.join(ta, table2.c.mytable_id == ta.c.myid)
+ self.assert_compile(
+ join.select(table2.c.mytable_id == 7).with_for_update(of=[join]),
+ "SELECT table2.mytable_id, "
+ "mytable_1.myid, mytable_1.name, mytable_1.description "
+ "FROM table2 "
+ "JOIN mytable AS mytable_1 "
+ "ON table2.mytable_id = mytable_1.myid "
+ "WHERE table2.mytable_id = %(mytable_id_1)s "
+ "FOR UPDATE OF mytable_1, table2",
+ )
+
def test_for_update_with_schema(self):
m = MetaData()
table1 = Table(