tmp = " FOR UPDATE"
if select._for_update_arg.of:
- # TODO: assuming simplistic c.table here
- tables = set(c.table for c in select._for_update_arg.of)
+ tables = util.OrderedSet(
+ c.table if isinstance(c, expression.ColumnClause)
+ else c for c in select._for_update_arg.of)
tmp += " OF " + ", ".join(
self.process(table, ashint=True)
for table in tables
expression, visitors
)
from ..sql.base import ColumnCollection
-from ..sql import operators
from . import properties
__all__ = ['Query', 'QueryContext', 'aliased']
_with_labels = False
_criterion = None
_yield_per = None
- _lockmode = None
_order_by = False
_group_by = False
_having = None
_prefixes = None
_offset = None
_limit = None
+ _for_update_arg = None
_statement = None
_correlate = frozenset()
_populate_existing = False
if not self._populate_existing and \
not mapper.always_refresh and \
- self._lockmode is None:
+ self._for_update_arg is None:
instance = loading.get_from_identity(
self.session, key, attributes.PASSIVE_OFF)
@_generative()
def with_lockmode(self, mode):
- """Return a new Query object with the specified locking mode.
+ """Return a new :class:`.Query` object with the specified "locking mode",
+ which essentially refers to the ``FOR UPDATE`` clause.
.. deprecated:: 0.9.0b2 superseded by :meth:`.Query.with_for_update`.
- :param mode: a string representing the desired locking mode. A
- corresponding :meth:`~sqlalchemy.orm.query.LockmodeArgs` object
- is passed to the ``for_update`` parameter of
- :meth:`~sqlalchemy.sql.expression.select` when the
- query is executed. Valid values are:
+ :param mode: a string representing the desired locking mode.
+ Valid values are:
- ``None`` - translates to no lockmode
+ * ``None`` - translates to no lockmode
- ``'update'`` - translates to ``FOR UPDATE``
- (standard SQL, supported by most dialects)
+ * ``'update'`` - translates to ``FOR UPDATE``
+ (standard SQL, supported by most dialects)
- ``'update_nowait'`` - translates to ``FOR UPDATE NOWAIT``
- (supported by Oracle, PostgreSQL 8.1 upwards)
+ * ``'update_nowait'`` - translates to ``FOR UPDATE NOWAIT``
+ (supported by Oracle, PostgreSQL 8.1 upwards)
- ``'read'`` - translates to ``LOCK IN SHARE MODE`` (for MySQL),
- and ``FOR SHARE`` (for PostgreSQL)
+ * ``'read'`` - translates to ``LOCK IN SHARE MODE`` (for MySQL),
+ and ``FOR SHARE`` (for PostgreSQL)
- .. versionadded:: 0.7.7
- ``FOR SHARE`` and ``FOR SHARE NOWAIT`` (PostgreSQL).
+ .. seealso::
- :param of: either a column descriptor, or list of column
- descriptors, representing the optional OF part of the
- clause. This passes the descriptor to the
- corresponding :meth:`~sqlalchemy.orm.query.LockmodeArgs` object,
- and translates to ``FOR UPDATE OF table [NOWAIT]`` respectively
- ``FOR UPDATE OF table, table [NOWAIT]`` (PostgreSQL), or
- ``FOR UPDATE OF table.column [NOWAIT]`` respectively
- ``FOR UPDATE OF table.column, table.column [NOWAIT]`` (Oracle).
+ :meth:`.Query.with_for_update` - improved API for
+ specifying the ``FOR UPDATE`` clause.
- .. versionadded:: 0.9.0b2
"""
+ self._for_update_arg = LockmodeArg.parse_legacy_query(mode)
+
+ @_generative()
+ def with_for_update(self, read=False, nowait=False, of=None):
+ """return a new :class:`.Query` with the specified options for the
+ ``FOR UPDATE`` clause.
+
+ The behavior of this method is identical to that of
+ :meth:`.SelectBase.with_for_update`. When called with no arguments,
+ the resulting ``SELECT`` statement will have a ``FOR UPDATE`` clause
+ appended. When additional arguments are specified, backend-specific
+ options such as ``FOR UPDATE NOWAIT`` or ``LOCK IN SHARE MODE``
+ can take effect.
+
+ E.g.::
+
+ q = sess.query(User).with_for_update(nowait=True, of=User)
+
+ The above query on a Postgresql backend will render like::
+
+ SELECT users.id AS users_id FROM users FOR UPDATE OF users NOWAIT
+
+ .. versionadded:: 0.9.0b2 :meth:`.Query.with_for_update` supersedes
+ the :meth:`.Query.with_lockmode` method.
+
+ .. seealso::
+
+ :meth:`.SelectBase.with_for_update` - Core level method with
+ full argument and behavioral description.
- self._lockmode = LockmodeArgs(mode=mode, of=of)
+ """
+ self._for_update_arg = LockmodeArg(read=read, nowait=nowait, of=of)
@_generative()
def params(self, *args, **kwargs):
context.labels = labels
- if isinstance(self._lockmode, bool) and self._lockmode:
- context.for_update = LockmodeArgs(mode='update')
- elif isinstance(self._lockmode, LockmodeArgs):
- if self._lockmode.mode not in LockmodeArgs.lockmodes:
- raise sa_exc.ArgumentError('Unknown lockmode %r' % self._lockmode.mode)
- context.for_update = self._lockmode
+ context._for_update_arg = self._for_update_arg
for entity in self._entities:
entity.setup_context(self, context)
statement = sql.select(
[inner] + context.secondary_columns,
- for_update=context.for_update,
use_labels=context.labels)
+ statement._for_update_arg = context._for_update_arg
+
from_clause = inner
for eager_join in context.eager_joins.values():
# EagerLoader places a 'stop_on' attribute on the join,
context.whereclause,
from_obj=context.froms,
use_labels=context.labels,
- for_update=context.for_update,
order_by=context.order_by,
**self._select_args
)
+ statement._for_update_arg = context._for_update_arg
+
for hint in self._with_hints:
statement = statement.with_hint(*hint)
def __str__(self):
return str(self._compile_context().statement)
+from ..sql.selectable import ForUpdateArg
+
+class LockmodeArg(ForUpdateArg):
+ @classmethod
+ def parse_legacy_query(self, mode):
+ if mode in (None, False):
+ return None
+
+ if mode == "read":
+ read = True
+ nowait = False
+ elif mode == "update":
+ read = nowait = False
+ elif mode == "update_nowait":
+ nowait = True
+ read = False
+ else:
+ raise sa_exc.ArgumentError(
+ "Unknown with_lockmode argument: %r" % mode)
+
+ return LockmodeArg(read=read, nowait=nowait)
class _QueryEntity(object):
"""represent an entity column returned within a Query result."""
``FOR SHARE NOWAIT`` (PostgreSQL).
"""
- if arg is None:
+ if arg in (None, False):
return None
nowait = read = False
self.nowait = nowait
self.read = read
if of is not None:
- self.of = [_only_column_elements(elem, "of")
+ self.of = [_interpret_as_column_or_from(elem)
for elem in util.to_list(of)]
else:
self.of = None
@property
def for_update(self):
- """Provide legacy dialect support for the ``for_update`` attribute
- as a getter.
-
+ """Provide legacy dialect support for the ``for_update`` attribute.
"""
if self._for_update_arg is not None:
return self._for_update_arg.legacy_for_update_value
else:
return None
+ @for_update.setter
+ def for_update(self, value):
+ self._for_update_arg = ForUpdateArg.parse_legacy_select(value)
+
@_generative
def with_for_update(self, nowait=False, read=False, of=None):
- """apply FOR UPDATE to this :class:`.SelectBase`.
+ """Specify a ``FOR UPDATE`` clause for this :class:`.SelectBase`.
E.g.::
stmt = select([table]).with_for_update(nowait=True)
- Additional keyword arguments are provided for common database-specific
+ On a database like Postgresql or Oracle, the above would render a
+ statement like::
+
+ SELECT table.a, table.b FROM table FOR UPDATE NOWAIT
+
+ on other backends, the ``nowait`` option is ignored and instead
+ would produce::
+
+ SELECT table.a, table.b FROM table FOR UPDATE
+
+ When called with no arguments, the statement will render with
+ the suffix ``FOR UPDATE``. Additional arguments can then be
+ provided which allow for common database-specific
variants.
:param nowait: boolean; will render ``FOR UPDATE NOWAIT`` on Oracle and
``FOR SHARE`` on Postgresql. On Postgresql, when combined with
``nowait``, will render ``FOR SHARE NOWAIT``.
- :param of: SQL expression or list of SQL expression elements which
+ :param of: SQL expression or list of SQL expression elements
+ (typically :class:`.Column` objects or a compatible expression) which
will render into a ``FOR UPDATE OF`` clause; supported by PostgreSQL
- and Oracle. May render as a table or as a column depending on
+ and Oracle. May render as a table or as a column depending on
backend.
-
.. versionadded:: 0.9.0b2
"""
resulting statement.
.. deprecated:: 0.9.0 - use :meth:`.SelectBase.with_for_update`
- to specify for update arguments.
+ to specify the structure of the ``FOR UPDATE`` clause.
- Additional values are accepted here, including:
+ ``for_update`` accepts various string values interpreted by
+ specific backends, including:
- ``None`` - translates to no lockmode
+ * ``"read"`` - on MySQL, translates to ``LOCK IN SHARE MODE``;
+ on Postgresql, translates to ``FOR SHARE``.
+ * ``"nowait"`` - on Postgresql and Oracle, translates to
+ ``FOR UPDATE NOWAIT``.
+ * ``"read_nowait"`` - on Postgresql, translates to
+ ``FOR SHARE NOWAIT``.
- ``'update'`` - translates to ``FOR UPDATE``
- (standard SQL, supported by most dialects)
-
- ``'update_nowait'`` - translates to ``FOR UPDATE NOWAIT``
- (supported by Oracle, PostgreSQL 8.1 upwards)
-
- ``'read'`` - translates to ``LOCK IN SHARE MODE`` (for MySQL),
- and ``FOR SHARE`` (for PostgreSQL)
-
- ``'read_nowait'`` - translates to ``FOR SHARE NOWAIT``
- (supported by PostgreSQL). ``FOR SHARE`` and
- ``FOR SHARE NOWAIT`` (PostgreSQL).
+ .. seealso::
- The :meth:`.SelectBase.with_for_update` method should be preferred as
- a means to specify FOR UPDATE more simply.
+ :meth:`.SelectBase.with_for_update` - improved API for
+ specifying the ``FOR UPDATE`` clause.
:param group_by:
a list of :class:`.ClauseElement` objects which will comprise the
from . import util as testutil
from sqlalchemy import pool, orm, util
-from sqlalchemy.engine import default, create_engine
+from sqlalchemy.engine import default, create_engine, url
from sqlalchemy import exc as sa_exc
from sqlalchemy.util import decorator
from sqlalchemy import types as sqltypes, schema
elif dialect == 'default':
dialect = default.DefaultDialect()
elif isinstance(dialect, util.string_types):
- dialect = create_engine("%s://" % dialect).dialect
+ dialect = url.URL(dialect).get_dialect()()
kw = {}
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR UPDATE OF mytable")
+ self.assert_compile(
+ table1.select(table1.c.myid == 7).
+ with_for_update(read=True, nowait=True, of=table1),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable WHERE mytable.myid = %(myid_1)s "
+ "FOR SHARE OF mytable NOWAIT")
+
self.assert_compile(
table1.select(table1.c.myid == 7).
with_for_update(read=True, nowait=True, of=table1.c.myid),
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = oracle.dialect()
+ __dialect__ = "oracle" #oracle.dialect()
def test_true_false(self):
self.assert_compile(
from sqlalchemy.databases import *
from sqlalchemy.orm import mapper
from sqlalchemy.orm import Session
-from sqlalchemy.testing import AssertsCompiledSQL
+from sqlalchemy.testing import AssertsCompiledSQL, eq_
from sqlalchemy.testing import assert_raises_message
+from sqlalchemy import exc
from test.orm import _fixtures
-class LockModeTest(_fixtures.FixtureTest, AssertsCompiledSQL):
+class LegacyLockModeTest(_fixtures.FixtureTest):
run_inserts = None
@classmethod
User, users = cls.classes.User, cls.tables.users
mapper(User, users)
- def test_default_update(self):
+ def _assert_legacy(self, arg, read=False, nowait=False):
+ User = self.classes.User
+ s = Session()
+ q = s.query(User).with_lockmode(arg)
+ sel = q._compile_context().statement
+
+ if arg is None:
+ assert q._for_update_arg is None
+ assert sel._for_update_arg is None
+ return
+
+ assert q._for_update_arg.read is read
+ assert q._for_update_arg.nowait is nowait
+
+ assert sel._for_update_arg.read is read
+ assert sel._for_update_arg.nowait is nowait
+
+ def test_false_legacy(self):
+ self._assert_legacy(None)
+
+ def test_plain_legacy(self):
+ self._assert_legacy("update")
+
+ def test_nowait_legacy(self):
+ self._assert_legacy("update_nowait", nowait=True)
+
+ def test_read_legacy(self):
+ self._assert_legacy("read", read=True)
+
+ def test_unknown_legacy_lock_mode(self):
User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update'),
- "SELECT users.id AS users_id FROM users FOR UPDATE",
- dialect=default.DefaultDialect()
+ assert_raises_message(
+ exc.ArgumentError, "Unknown with_lockmode argument: 'unknown_mode'",
+ sess.query(User.id).with_lockmode, 'unknown_mode'
)
- def test_not_supported_by_dialect_should_just_use_update(self):
+class ForUpdateTest(_fixtures.FixtureTest):
+ @classmethod
+ def setup_mappers(cls):
+ User, users = cls.classes.User, cls.tables.users
+ mapper(User, users)
+
+ def _assert(self, read=False, nowait=False, of=None,
+ assert_q_of=None, assert_sel_of=None):
User = self.classes.User
- sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('read'),
- "SELECT users.id AS users_id FROM users FOR UPDATE",
- dialect=default.DefaultDialect()
+ s = Session()
+ q = s.query(User).with_for_update(read=read, nowait=nowait, of=of)
+ sel = q._compile_context().statement
+
+ assert q._for_update_arg.read is read
+ assert sel._for_update_arg.read is read
+
+ assert q._for_update_arg.nowait is nowait
+ assert sel._for_update_arg.nowait is nowait
+
+ eq_(q._for_update_arg.of, assert_q_of)
+ eq_(sel._for_update_arg.of, assert_sel_of)
+
+ def test_read(self):
+ self._assert(read=True)
+
+ def test_plain(self):
+ self._assert()
+
+ def test_nowait(self):
+ self._assert(nowait=True)
+
+ def test_of_single_col(self):
+ User, users = self.classes.User, self.tables.users
+ self._assert(
+ of=User.id,
+ assert_q_of=[users.c.id],
+ assert_sel_of=[users.c.id]
)
- def test_none_lock_mode(self):
+class CompileTest(_fixtures.FixtureTest, AssertsCompiledSQL):
+ """run some compile tests, even though these are redundant."""
+ run_inserts = None
+
+ @classmethod
+ def setup_mappers(cls):
+ User, users = cls.classes.User, cls.tables.users
+ Address, addresses = cls.classes.Address, cls.tables.addresses
+ mapper(User, users)
+ mapper(Address, addresses)
+
+ def test_default_update(self):
User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode(None),
- "SELECT users.id AS users_id FROM users",
+ self.assert_compile(sess.query(User.id).with_for_update(),
+ "SELECT users.id AS users_id FROM users FOR UPDATE",
dialect=default.DefaultDialect()
)
- def test_unknown_lock_mode(self):
+ def test_not_supported_by_dialect_should_just_use_update(self):
User = self.classes.User
sess = Session()
- assert_raises_message(
- Exception, "Unknown lockmode 'unknown_mode'",
- self.assert_compile,
- sess.query(User.id).with_lockmode('unknown_mode'), None,
+ self.assert_compile(sess.query(User.id).with_for_update(read=True),
+ "SELECT users.id AS users_id FROM users FOR UPDATE",
dialect=default.DefaultDialect()
)
def test_postgres_read(self):
User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('read'),
+ self.assert_compile(sess.query(User.id).with_for_update(read=True),
"SELECT users.id AS users_id FROM users FOR SHARE",
- dialect=postgresql.dialect()
+ dialect="postgresql"
)
def test_postgres_read_nowait(self):
User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('read_nowait'),
+ self.assert_compile(sess.query(User.id).
+ with_for_update(read=True, nowait=True),
"SELECT users.id AS users_id FROM users FOR SHARE NOWAIT",
- dialect=postgresql.dialect()
+ dialect="postgresql"
)
def test_postgres_update(self):
User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update'),
+ self.assert_compile(sess.query(User.id).with_for_update(),
"SELECT users.id AS users_id FROM users FOR UPDATE",
- dialect=postgresql.dialect()
+ dialect="postgresql"
)
def test_postgres_update_of(self):
User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).for_update(of=User.id),
+ self.assert_compile(sess.query(User.id).with_for_update(of=User.id),
"SELECT users.id AS users_id FROM users FOR UPDATE OF users",
- dialect=postgresql.dialect()
+ dialect="postgresql"
)
- def test_postgres_update_of_list(self):
+ def test_postgres_update_of_entity(self):
User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).for_update(of=[User.id, User.id, User.id]),
+ self.assert_compile(sess.query(User.id).with_for_update(of=User),
"SELECT users.id AS users_id FROM users FOR UPDATE OF users",
- dialect=postgresql.dialect()
+ dialect="postgresql"
)
-
- def test_postgres_update_nowait(self):
+ def test_postgres_update_of_entity_list(self):
User = self.classes.User
- sess = Session()
- self.assert_compile(sess.query(User.id).for_updatewith_lockmode('update_nowait'),
- "SELECT users.id AS users_id FROM users FOR UPDATE NOWAIT",
- dialect=postgresql.dialect()
- )
+ Address = self.classes.Address
- def test_postgres_update_nowait_of(self):
- User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update_nowait', of=User.id),
- "SELECT users.id AS users_id FROM users FOR UPDATE OF users NOWAIT",
- dialect=postgresql.dialect()
+ self.assert_compile(sess.query(User.id, Address.id).
+ with_for_update(of=[User, Address]),
+ "SELECT users.id AS users_id, addresses.id AS addresses_id "
+ "FROM users, addresses FOR UPDATE OF users, addresses",
+ dialect="postgresql"
)
- def test_postgres_update_nowait_of_list(self):
+ def test_postgres_update_of_list(self):
User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update_nowait', of=[User.id, User.id, User.id]),
- "SELECT users.id AS users_id FROM users FOR UPDATE OF users, users, users NOWAIT",
- dialect=postgresql.dialect()
+ self.assert_compile(sess.query(User.id).
+ with_for_update(of=[User.id, User.id, User.id]),
+ "SELECT users.id AS users_id FROM users FOR UPDATE OF users",
+ dialect="postgresql"
)
+
def test_oracle_update(self):
User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update'),
+ self.assert_compile(sess.query(User.id).with_for_update(),
"SELECT users.id AS users_id FROM users FOR UPDATE",
- dialect=oracle.dialect()
- )
-
- def test_oracle_update_of(self):
- User = self.classes.User
- sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update', of=User.id),
- "SELECT users.id AS users_id FROM users FOR UPDATE OF users.id",
- dialect=oracle.dialect()
- )
-
- def test_oracle_update_of_list(self):
- User = self.classes.User
- sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update', of=[User.id, User.id, User.id]),
- "SELECT users.id AS users_id FROM users FOR UPDATE OF users.id, users.id, users.id",
- dialect=oracle.dialect()
- )
-
- def test_oracle_update_nowait(self):
- User = self.classes.User
- sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update_nowait'),
- "SELECT users.id AS users_id FROM users FOR UPDATE NOWAIT",
- dialect=oracle.dialect()
- )
-
- def test_oracle_update_nowait_of(self):
- User = self.classes.User
- sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update_nowait', of=User.id),
- "SELECT users.id AS users_id FROM users FOR UPDATE OF users.id NOWAIT",
- dialect=oracle.dialect()
- )
-
- def test_oracle_update_nowait_of_list(self):
- User = self.classes.User
- sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update_nowait', of=[User.id, User.id, User.id]),
- "SELECT users.id AS users_id FROM users FOR UPDATE OF users.id, users.id, users.id NOWAIT",
- dialect=oracle.dialect()
+ dialect="oracle"
)
def test_mysql_read(self):
User = self.classes.User
sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('read'),
+ self.assert_compile(sess.query(User.id).with_for_update(read=True),
"SELECT users.id AS users_id FROM users LOCK IN SHARE MODE",
- dialect=mysql.dialect()
- )
-
- def test_mysql_update(self):
- User = self.classes.User
- sess = Session()
- self.assert_compile(sess.query(User.id).with_lockmode('update'),
- "SELECT users.id AS users_id FROM users FOR UPDATE",
- dialect=mysql.dialect()
+ dialect="mysql"
)
meth, q, *arg, **kw
)
+
class OperatorTest(QueryTest, AssertsCompiledSQL):
"""test sql.Comparator implementation for MapperProperties"""
def _assert_legacy(self, leg, read=False, nowait=False):
t = table('t', column('c'))
s1 = select([t], for_update=leg)
+
if leg is False:
assert s1._for_update_arg is None
assert s1.for_update is None
def test_read_nowait_legacy(self):
self._assert_legacy("read_nowait", read=True, nowait=True)
+ def test_legacy_setter(self):
+ t = table('t', column('c'))
+ s = select([t])
+ s.for_update = 'nowait'
+ eq_(s._for_update_arg.nowait, True)
+
def test_basic_clone(self):
t = table('t', column('c'))
s = select([t]).with_for_update(read=True, of=t.c.c)