Declarative becomes a first class API
=====================================
-.. admonition:: Certainty: almost definitely
+.. admonition:: Certainty: definite
- Declarative is already what all the ORM documentation refers towards
- so it doesn't even make sense that it's an "ext". The hardest part will
- be integrating the declarative documentation appropriately.
+ This is now committed in master and the new documenation can be seen at
+ :ref:`orm_mapping_classes_toplevel`.
Declarative will now be part of ``sqlalchemy.orm`` in 2.0, and in 1.4 the
new version will be present in ``sqlalchemy.future.orm``. The concept
of the ``Base`` class will be there as it is now and do the same thing
it already does, however it will also have some new capabilities.
+.. seealso::
-The original "mapper()" function removed; replaced with a Declarative compatibility function
-============================================================================================
-
-.. admonition:: Certainty: tentative
-
- The proposal to have "mapper()" be a sub-function of declarative simplifies
- the codepaths towards a class becoming mapped. The "classical mapping"
- pattern doesn't really have that much usefulness, however as some users have
- expressed their preference for it, the same code pattern will continue to
- be available, just on top of declarative. Hopefully it should be a little
- nicer even.
-
-Declarative has become very capable and in fact a mapping that is set up with
-declarative may have a superior configuration than one made with ``mapper()`` alone.
-Features that make a declarative mapping superior include:
-
-* The declarative mapping has a reference to the "class registry", which is a
- local set of classes that can then be accessed configurationally via strings
- when configuring inter-class relationships. Put another way, using declarative
- you can say ``relationship("SomeClass")``, and the string name ``"SomeClass"``
- is late-resolved to the actual mapped class ``SomeClass``.
-
-* Declarative provides convenience hooks on mapped classes such as
- ``__declare_first__`` and ``__declare_last__``. It also allows for
- mixins and ``__abstract__`` classes which provide for superior organization
- of classes and attributes.
-
-* Declarative sets parameters on the underlying ``mapper()`` that allow for
- better behaviors. A key example is when configuring single table
- inheritance, and a particular table column is local to a subclass, Declarative
- automatically sets up ``exclude_columns`` on the base class and other sibling
- classes that don't include those columns.
-
-* Declarative also ensures that "inherits" is configured appropriately for
- mappers against inherited classes and checks for several other conditions
- that can only be determined by the fact that Declarative scans table information
- from the mapped class itself.
-
-Some of the above Declarative capabilities are lost when one declares their
-mapping using ``__table__``, however the class registry and special hooks
-are still available. Declarative does not in fact depend on the use of
-a special base class or metaclass, this is just the API that is currently
-used. An alternative API that behaves just like ``mapper()`` can be defined
-right now as follows::
-
- # 1.xx code
-
- from sqlalchemy.ext.declarative import base
- def declarative_mapper():
- _decl_class_registry = {}
-
- def mapper(cls, table, properties={}):
- cls.__table__ = table
- cls._decl_class_registry = _decl_class_registry
- for key, value in properties.items():
- setattr(cls, key, value)
- base._as_declarative(cls, cls.__name__, cls.__dict__)
+ :ref:`orm_mapping_classes_toplevel` - all new unified documentation for
+ Declarative, classical mapping, dataclasses, attrs, etc.
- return mapper
- # mapper here is the mapper() function
- mapper = declarative_mapper()
+The original "mapper()" function now a core element of Declarative, renamed
+===========================================================================
-Above, the ``mapper()`` callable is using a class registry that's local
-to where the ``declarative_mapper()`` function was called. However, we
-can just as easily add the above ``mapper()`` function to any declarative base,
-to make for a pattern such as::
+.. admonition:: Certainty: definite
- from sqlalchemy.future.orm import declarative_base
+ This is now committed in master and the new documenation can be seen at
+ :ref:`orm_mapping_classes_toplevel`.
- base = declarative_base()
+By popular demand, "classical mapping" is staying around, however the new
+form of it is based off of the :class:`_orm.registry` object and is available
+as :meth:`_orm.registry.map_imperatively`.
- class MyClass(object):
- pass
+In addition, the primary rationale used for "classical mapping" is that of
+keeping the :class:`_schema.Table` setup distinct from the class. Declarative
+has always allowed this style using so-called
+:ref:`hybrid declarative <orm_imperative_table_configuration>`. However,
+to remove the base class requirement, a first class :ref:`decorator <declarative_config_toplevel>`
+form has been added.
- my_table = Table("my_table", base.metadata, Column('id', Integer, primary_key=True))
+As yet another separate but related enhancement, support for :ref:`Python
+dataclasses <orm_declarative_dataclasses>` is added as well to both
+declarative decorator and classical mapping forms.
- # "classical" mapping:
- base.mapper(MyClass, my_table)
+.. seealso::
-In 2.0, an application that still wishes to use a separate :class:`_schema.Table` and
-does not want to use Declarative with ``__table__``, can instead use the above
-pattern which basically does the same thing.
+ :ref:`orm_mapping_classes_toplevel` - all new unified documentation for
+ Declarative, classical mapping, dataclasses, attrs, etc.
.. _migration_20_unify_select:
)
dialect = self.dialect
+
ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_compiled,
# textual statement with positional parameters to
# execute().
# execute(stmt, ("value", "value"))
-
return [zero]
elif hasattr(zero, "keys"):
# execute(stmt, {"key":"value"})
if shard_id is None:
shard_id = self._choose_shard_and_assign(mapper, instance)
- if self.transaction is not None:
- return self.transaction.connection(mapper, shard_id=shard_id)
+ if self.in_transaction():
+ return self.get_transaction().connection(mapper, shard_id=shard_id)
else:
return self.get_bind(
mapper, shard_id=shard_id, instance=instance
if bind is not None:
# util.deprecated_params does not work
util.warn_deprecated_20(
- 'The "bind" argument to declarative_base is'
+ "The ``bind`` argument to declarative_base is "
"deprecated and will be removed in SQLAlchemy 2.0.",
)
@util.deprecated_params(
bind=(
"2.0",
- 'The "bind" argument to declarative_base is'
+ "The ``bind`` argument to as_declarative is "
"deprecated and will be removed in SQLAlchemy 2.0.",
)
)
cols = []
for key in col_attribute_names:
cols.extend(props[key].columns)
- return sql.select(cols, cond, use_labels=True)
+ return sql.select(*cols).where(cond).apply_labels()
def _iterate_to_target_viawpoly(self, mapper):
if self.isa(mapper):
self.enable_eagerloads(False)
.add_columns(sql.literal_column("1"))
.with_labels()
- .statement.with_only_columns([1])
+ .statement.with_only_columns(1)
)
ezero = self._entity_from_pre_ent_zero()
bulk_ud.query = new_query
self = bulk_ud.query
- upd = sql.update(*self._raw_columns, **update_args).values(values)
+ upd = sql.update(*self._raw_columns)
+
+ ppo = update_args.pop("preserve_parameter_order", False)
+ if ppo:
+ upd = upd.ordered_values(*values)
+ else:
+ upd = upd.values(values)
+ if update_args:
+ upd = upd.with_dialect_options(**update_args)
+
upd._where_criteria = self._where_criteria
result = self.session.execute(
upd,
result = []
for type_, table in table_map.items():
if typecolname is not None:
- cols = [col(name, table) for name in colnames]
- cols.append(
- sql.literal_column(sql_util._quote_ddl_expr(type_)).label(
- typecolname
- ),
+ result.append(
+ sql.select(
+ *(
+ [col(name, table) for name in colnames]
+ + [
+ sql.literal_column(
+ sql_util._quote_ddl_expr(type_)
+ ).label(typecolname)
+ ]
+ )
+ ).select_from(table)
)
- result.append(sql.select(*cols).select_from(table))
else:
result.append(
sql.select(
"The :paramref:`%(func)s.whereclause` parameter "
"will be removed "
"in SQLAlchemy 2.0. Please refer to the "
- ":meth:`.%(classname)s.where` method."
+ ":meth:`%(classname)s.where` method."
),
values=(
"The :paramref:`%(func)s.values` parameter will be removed "
def _validate_dialect_kwargs_deprecated(self, dialect_kw):
util.warn_deprecated_20(
"Passing dialect keyword arguments directly to the "
- "constructor is deprecated and will be removed in SQLAlchemy "
+ "%s constructor is deprecated and will be removed in SQLAlchemy "
"2.0. Please use the ``with_dialect_options()`` method."
+ % (self.__class__.__name__)
)
self._validate_dialect_kwargs(dialect_kw)
"""
return self.with_only_columns(
- util.preloaded.sql_util.reduce_columns(
+ *util.preloaded.sql_util.reduce_columns(
self._exported_columns_iterator(),
only_synonyms=only_synonyms,
*(self._where_criteria + self._from_obj)
sa.orm.clear_mappers()
+def create_session(**kw):
+ kw.setdefault("autoflush", False)
+ kw.setdefault("expire_on_commit", False)
+ return sa.orm.Session(config.db, **kw)
+
+
class ORMTest(_ORMTest, TestBase):
pass
# likely due to the introduction of __signature__.
from sqlalchemy.util import decorator
+ from sqlalchemy.util import deprecations
+ from sqlalchemy.engine import row
+ from sqlalchemy.testing import mock
@decorator
def wrap(fn, *args, **kw):
- for warm in range(warmup):
- fn(*args, **kw)
-
- timerange = range(times)
- with count_functions(variance=variance):
- for time in timerange:
- rv = fn(*args, **kw)
- return rv
+
+ with mock.patch.object(
+ deprecations, "SQLALCHEMY_WARN_20", False
+ ), mock.patch.object(
+ row.LegacyRow, "_default_key_style", row.KEY_OBJECTS_NO_WARN
+ ):
+ for warm in range(warmup):
+ fn(*args, **kw)
+
+ timerange = range(times)
+ with count_functions(variance=variance):
+ for time in timerange:
+ rv = fn(*args, **kw)
+ return rv
return wrap
)
connection.execute(
- table.insert(inline=True).from_select(
+ table.insert()
+ .inline()
+ .from_select(
("id", "data"),
select(table.c.id + 5, table.c.data).where(
table.c.data.in_(["data2", "data3"])
)
connection.execute(
- table.insert(inline=True).from_select(
+ table.insert()
+ .inline()
+ .from_select(
("id", "data"),
select(table.c.id + 5, table.c.data).where(
table.c.data.in_(["data2", "data3"])
message=r"^The (Sybase|firebird) dialect is deprecated and will be",
)
+ # 2.0 deprecation warnings, which we will want to have all of these
+ # be "error" however for I98b8defdf7c37b818b3824d02f7668e3f5f31c94
+ # we are moving one at a time
+ for msg in [
+ #
+ # Core execution
+ #
+ r"The (?:Executable|Engine)\.(?:execute|scalar)\(\) function",
+ r"The current statement is being autocommitted using implicit "
+ "autocommit,",
+ r"The connection.execute\(\) method in SQLAlchemy 2.0 will accept "
+ "parameters as a single dictionary or a single sequence of "
+ "dictionaries only.",
+ r"The Connection.connect\(\) function/method is considered legacy",
+ r".*DefaultGenerator.execute\(\)",
+ #
+ # result sets
+ #
+ r"The Row.keys\(\) function/method",
+ r"Using non-integer/slice indices on Row ",
+ #
+ # Core SQL constructs
+ #
+ r"The FromClause\.select\(\).whereclause parameter is deprecated",
+ r"Set functions such as union\(\), union_all\(\), extract\(\),",
+ r"The legacy calling style of select\(\) is deprecated and will be "
+ "removed",
+ r"The FromClause.select\(\) method will no longer accept keyword "
+ "arguments in version 2.0",
+ r"The Join.select\(\) method will no longer accept keyword arguments "
+ "in version 2.0.",
+ r"The \"whens\" argument to case\(\) is now passed",
+ r"The Join.select\(\).whereclause parameter is deprecated",
+ #
+ # DML
+ #
+ r"The (?:update|delete).whereclause parameter will be removed in "
+ "SQLAlchemy 2.0.",
+ r"The (?:insert|update).values parameter will be removed in "
+ "SQLAlchemy 2.0.",
+ r"The update.preserve_parameter_order parameter will be removed in "
+ "SQLAlchemy 2.0.",
+ r"Passing dialect keyword arguments directly to the "
+ "(?:Insert|Update|Delete) constructor",
+ #
+ # ORM configuration
+ #
+ r"Calling the mapper\(\) function directly outside of a "
+ "declarative registry",
+ r"The ``declarative_base\(\)`` function is now available ",
+ r"The ``has_inherited_table\(\)`` function is now available",
+ r"The ``bind`` argument to declarative_base is deprecated and will "
+ "be removed in SQLAlchemy 2.0.",
+ #
+ # ORM Query
+ #
+ r"The Query\.get\(\) function",
+ r"The Query\.from_self\(\) function",
+ #
+ # ORM Session
+ #
+ r"The Session.autocommit parameter is deprecated ",
+ r".*object is being merged into a Session along the backref "
+ "cascade path",
+ r"Passing bind arguments to Session.execute\(\) as keyword arguments",
+ r"The Session.transaction function/method",
+ r"The merge_result\(\) method is superseded by the "
+ r"merge_frozen_result\(\)",
+ r"The Session.begin.subtransactions flag is deprecated",
+ ]:
+ warnings.filterwarnings(
+ "ignore", message=msg, category=sa_exc.RemovedIn20Warning,
+ )
+
try:
import pytest
except ImportError:
t = self.tables.data
- ins = t.insert(inline=True).values(
- id=bindparam("id"),
- x=select(literal_column("5"))
- .select_from(self.tables.data)
- .scalar_subquery(),
- y=bindparam("y"),
- z=bindparam("z"),
+ ins = (
+ t.insert()
+ .inline()
+ .values(
+ id=bindparam("id"),
+ x=select(literal_column("5"))
+ .select_from(self.tables.data)
+ .scalar_subquery(),
+ y=bindparam("y"),
+ z=bindparam("z"),
+ )
)
# compiled SQL has a newline in it
eq_(
t = self.tables.data
- ins = t.insert(inline=True).values(
- id=bindparam("id"),
- x=select(literal_column("5"))
- .select_from(self.tables.data)
- .scalar_subquery(),
- y=bindparam("y"),
- z=bindparam("z"),
+ ins = (
+ t.insert()
+ .inline()
+ .values(
+ id=bindparam("id"),
+ x=select(literal_column("5"))
+ .select_from(self.tables.data)
+ .scalar_subquery(),
+ y=bindparam("y"),
+ z=bindparam("z"),
+ )
)
# compiled SQL has a newline in it
eq_(
# single execute, explicit id, inline
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
# single execute, inline, uses SERIAL
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
{"id": 32, "data": "d4"},
)
conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
# single execute, explicit id, inline
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
# single execute, inline, uses SERIAL
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
{"id": 32, "data": "d4"},
)
conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
{"id": 32, "data": "d4"},
)
conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
{"id": 32, "data": "d4"},
)
conn.execute(table.insert(), {"data": "d5"}, {"data": "d6"})
- conn.execute(
- table.insert(inline=True), {"id": 33, "data": "d7"}
- )
- conn.execute(table.insert(inline=True), {"data": "d8"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d7"})
+ conn.execute(table.insert().inline(), {"data": "d8"})
asserter.assert_(
DialectSQL(
table.insert(),
[{"id": 31, "data": "d2"}, {"id": 32, "data": "d3"}],
)
- conn.execute(table.insert(inline=True), {"id": 33, "data": "d4"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d4"})
eq_(
conn.execute(table.select()).fetchall(),
[(30, "d1"), (31, "d2"), (32, "d3"), (33, "d4")],
table.insert(),
[{"id": 31, "data": "d2"}, {"id": 32, "data": "d3"}],
)
- conn.execute(table.insert(inline=True), {"id": 33, "data": "d4"})
+ conn.execute(table.insert().inline(), {"id": 33, "data": "d4"})
eq_(
conn.execute(table.select()).fetchall(),
[(30, "d1"), (31, "d2"), (32, "d3"), (33, "d4")],
)
metadata.create_all()
with testing.db.connect() as conn:
- conn.execute("CREATE INDEX idx1 ON t (x) INCLUDE (name)")
+ conn.exec_driver_sql("CREATE INDEX idx1 ON t (x) INCLUDE (name)")
# prior to #5205, this would return:
# [{'column_names': ['x', 'name'],
).fetchall()
eq_([d for d, in data], [None])
- def _test_insert(self, engine):
- with engine.connect() as conn:
- conn.execute(
- self.tables.data_table.insert(),
- {"name": "r1", "data": {"k1": "r1v1", "k2": "r1v2"}},
- )
- self._assert_data([{"k1": "r1v1", "k2": "r1v2"}], conn)
+ def _test_insert(self, conn):
+ conn.execute(
+ self.tables.data_table.insert(),
+ {"name": "r1", "data": {"k1": "r1v1", "k2": "r1v2"}},
+ )
+ self._assert_data([{"k1": "r1v1", "k2": "r1v2"}], conn)
- def _test_insert_nulls(self, engine):
- with engine.connect() as conn:
- conn.execute(
- self.tables.data_table.insert(), {"name": "r1", "data": null()}
- )
- self._assert_data([None], conn)
+ def _test_insert_nulls(self, conn):
+ conn.execute(
+ self.tables.data_table.insert(), {"name": "r1", "data": null()}
+ )
+ self._assert_data([None], conn)
- def _test_insert_none_as_null(self, engine):
- with engine.connect() as conn:
- conn.execute(
- self.tables.data_table.insert(),
- {"name": "r1", "nulldata": None},
- )
- self._assert_column_is_NULL(conn, column="nulldata")
+ def _test_insert_none_as_null(self, conn):
+ conn.execute(
+ self.tables.data_table.insert(), {"name": "r1", "nulldata": None},
+ )
+ self._assert_column_is_NULL(conn, column="nulldata")
- def _test_insert_nulljson_into_none_as_null(self, engine):
- with engine.connect() as conn:
- conn.execute(
- self.tables.data_table.insert(),
- {"name": "r1", "nulldata": JSON.NULL},
- )
- self._assert_column_is_JSON_NULL(conn, column="nulldata")
+ def _test_insert_nulljson_into_none_as_null(self, conn):
+ conn.execute(
+ self.tables.data_table.insert(),
+ {"name": "r1", "nulldata": JSON.NULL},
+ )
+ self._assert_column_is_JSON_NULL(conn, column="nulldata")
def test_reflect(self):
insp = inspect(testing.db)
def test_stmt_exception_bytestring_raised(self):
name = util.u("méil")
+ users = self.tables.users
with testing.db.connect() as conn:
assert_raises_message(
tsa.exc.StatementError,
with engine.connect() as conn:
conn.exec_driver_sql("%s something table something" % keyword)
- for _call in dbapi.connect().mock_calls:
- _call.kwargs.clear()
-
if expected:
- eq_(dbapi.connect().mock_calls, [call.cursor(), call.commit()])
+ eq_(
+ [n for (n, k, s) in dbapi.connect().mock_calls],
+ ["cursor", "commit"],
+ )
else:
- eq_(dbapi.connect().mock_calls, [call.cursor()])
+ eq_(
+ [n for (n, k, s) in dbapi.connect().mock_calls], ["cursor"]
+ )
class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase):
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import util
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import config
cls.c2 = c2 = Company(name="Elbonia, Inc.")
c2.employees = [e3]
- sess = create_session(connection)
- sess.add(c1)
- sess.add(c2)
- sess.flush()
- sess.expunge_all()
+ with sessionmaker(connection, expire_on_commit=False).begin() as sess:
+ sess.add(c1)
+ sess.add(c2)
cls.all_employees = [e1, e2, b1, m1, e3]
cls.c1_employees = [e1, e2, b1, m1]
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
+from sqlalchemy import testing
from sqlalchemy.orm import class_mapper
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import mapper
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
from sqlalchemy.orm.interfaces import MANYTOONE
from sqlalchemy.orm.interfaces import ONETOMANY
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-def produce_test(parent, child, direction):
- """produce a testcase for A->B->C inheritance with a self-referential
- relationship between two of the classes, using either one-to-many or
- many-to-one.
+def _combinations():
+ for parent in ["a", "b", "c"]:
+ for child in ["a", "b", "c"]:
+ for direction in [ONETOMANY, MANYTOONE]:
+
+ name = "Test%sTo%s%s" % (
+ parent,
+ child,
+ (direction is ONETOMANY and "O2M" or "M2O"),
+ )
+ yield (name, parent, child, direction)
- the old "no discriminator column" pattern is used.
- """
+@testing.combinations(
+ *list(_combinations()), argnames="name,parent,child,direction", id_="saaa"
+)
+class ABCTest(fixtures.MappedTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ parent, child, direction = cls.parent, cls.child, cls.direction
- class ABCTest(fixtures.MappedTest):
- @classmethod
- def define_tables(cls, metadata):
- global ta, tb, tc
- ta = ["a", metadata]
+ ta = ["a", metadata]
+ ta.append(
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True,
+ )
+ ),
+ ta.append(Column("a_data", String(30)))
+ if "a" == parent and direction == MANYTOONE:
ta.append(
Column(
- "id",
+ "child_id",
Integer,
- primary_key=True,
- test_needs_autoincrement=True,
- )
- ),
- ta.append(Column("a_data", String(30)))
- if "a" == parent and direction == MANYTOONE:
- ta.append(
- Column(
- "child_id",
- Integer,
- ForeignKey(
- "%s.id" % child, use_alter=True, name="foo"
- ),
- )
+ ForeignKey("%s.id" % child, use_alter=True, name="foo"),
)
- elif "a" == child and direction == ONETOMANY:
- ta.append(
- Column(
- "parent_id",
- Integer,
- ForeignKey(
- "%s.id" % parent, use_alter=True, name="foo"
- ),
- )
+ )
+ elif "a" == child and direction == ONETOMANY:
+ ta.append(
+ Column(
+ "parent_id",
+ Integer,
+ ForeignKey("%s.id" % parent, use_alter=True, name="foo"),
)
- ta = Table(*ta)
-
- tb = ["b", metadata]
- tb.append(
- Column("id", Integer, ForeignKey("a.id"), primary_key=True)
)
+ ta = Table(*ta)
- tb.append(Column("b_data", String(30)))
-
- if "b" == parent and direction == MANYTOONE:
- tb.append(
- Column(
- "child_id",
- Integer,
- ForeignKey(
- "%s.id" % child, use_alter=True, name="foo"
- ),
- )
- )
- elif "b" == child and direction == ONETOMANY:
- tb.append(
- Column(
- "parent_id",
- Integer,
- ForeignKey(
- "%s.id" % parent, use_alter=True, name="foo"
- ),
- )
- )
- tb = Table(*tb)
+ tb = ["b", metadata]
+ tb.append(Column("id", Integer, ForeignKey("a.id"), primary_key=True))
- tc = ["c", metadata]
- tc.append(
- Column("id", Integer, ForeignKey("b.id"), primary_key=True)
- )
+ tb.append(Column("b_data", String(30)))
- tc.append(Column("c_data", String(30)))
-
- if "c" == parent and direction == MANYTOONE:
- tc.append(
- Column(
- "child_id",
- Integer,
- ForeignKey(
- "%s.id" % child, use_alter=True, name="foo"
- ),
- )
- )
- elif "c" == child and direction == ONETOMANY:
- tc.append(
- Column(
- "parent_id",
- Integer,
- ForeignKey(
- "%s.id" % parent, use_alter=True, name="foo"
- ),
- )
+ if "b" == parent and direction == MANYTOONE:
+ tb.append(
+ Column(
+ "child_id",
+ Integer,
+ ForeignKey("%s.id" % child, use_alter=True, name="foo"),
)
- tc = Table(*tc)
-
- def teardown(self):
- if direction == MANYTOONE:
- parent_table = {"a": ta, "b": tb, "c": tc}[parent]
- parent_table.update(
- values={parent_table.c.child_id: None}
- ).execute()
- elif direction == ONETOMANY:
- child_table = {"a": ta, "b": tb, "c": tc}[child]
- child_table.update(
- values={child_table.c.parent_id: None}
- ).execute()
- super(ABCTest, self).teardown()
-
- def test_roundtrip(self):
- parent_table = {"a": ta, "b": tb, "c": tc}[parent]
- child_table = {"a": ta, "b": tb, "c": tc}[child]
-
- remote_side = None
-
- if direction == MANYTOONE:
- foreign_keys = [parent_table.c.child_id]
- elif direction == ONETOMANY:
- foreign_keys = [child_table.c.parent_id]
-
- atob = ta.c.id == tb.c.id
- btoc = tc.c.id == tb.c.id
-
- if direction == ONETOMANY:
- relationshipjoin = parent_table.c.id == child_table.c.parent_id
- elif direction == MANYTOONE:
- relationshipjoin = parent_table.c.child_id == child_table.c.id
- if parent is child:
- remote_side = [child_table.c.id]
-
- abcjoin = polymorphic_union(
- {
- "a": ta.select(
- tb.c.id == None, # noqa
- from_obj=[ta.outerjoin(tb, onclause=atob)],
- ).subquery(),
- "b": ta.join(tb, onclause=atob)
- .outerjoin(tc, onclause=btoc)
- .select(tc.c.id == None)
- .reduce_columns()
- .subquery(), # noqa
- "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob),
- },
- "type",
- "abcjoin",
)
-
- bcjoin = polymorphic_union(
- {
- "b": ta.join(tb, onclause=atob)
- .outerjoin(tc, onclause=btoc)
- .select(tc.c.id == None)
- .reduce_columns()
- .subquery(), # noqa
- "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob),
- },
- "type",
- "bcjoin",
+ elif "b" == child and direction == ONETOMANY:
+ tb.append(
+ Column(
+ "parent_id",
+ Integer,
+ ForeignKey("%s.id" % parent, use_alter=True, name="foo"),
+ )
)
+ tb = Table(*tb)
- class A(object):
- def __init__(self, name):
- self.a_data = name
+ tc = ["c", metadata]
+ tc.append(Column("id", Integer, ForeignKey("b.id"), primary_key=True))
- class B(A):
- pass
+ tc.append(Column("c_data", String(30)))
- class C(B):
- pass
-
- mapper(
- A,
- ta,
- polymorphic_on=abcjoin.c.type,
- with_polymorphic=("*", abcjoin),
- polymorphic_identity="a",
- )
- mapper(
- B,
- tb,
- polymorphic_on=bcjoin.c.type,
- with_polymorphic=("*", bcjoin),
- polymorphic_identity="b",
- inherits=A,
- inherit_condition=atob,
- )
- mapper(
- C,
- tc,
- polymorphic_identity="c",
- with_polymorphic=("*", tc.join(tb, btoc).join(ta, atob)),
- inherits=B,
- inherit_condition=btoc,
+ if "c" == parent and direction == MANYTOONE:
+ tc.append(
+ Column(
+ "child_id",
+ Integer,
+ ForeignKey("%s.id" % child, use_alter=True, name="foo"),
+ )
)
-
- parent_mapper = class_mapper({ta: A, tb: B, tc: C}[parent_table])
- child_mapper = class_mapper({ta: A, tb: B, tc: C}[child_table])
-
- parent_class = parent_mapper.class_
- child_class = child_mapper.class_
-
- parent_mapper.add_property(
- "collection",
- relationship(
- child_mapper,
- primaryjoin=relationshipjoin,
- foreign_keys=foreign_keys,
- order_by=child_mapper.c.id,
- remote_side=remote_side,
- uselist=True,
- ),
+ elif "c" == child and direction == ONETOMANY:
+ tc.append(
+ Column(
+ "parent_id",
+ Integer,
+ ForeignKey("%s.id" % parent, use_alter=True, name="foo"),
+ )
)
-
- sess = create_session()
-
- parent_obj = parent_class("parent1")
- child_obj = child_class("child1")
- somea = A("somea")
- someb = B("someb")
- somec = C("somec")
-
- # print "APPENDING", parent.__class__.__name__ , "TO",
- # child.__class__.__name__
-
- sess.add(parent_obj)
- parent_obj.collection.append(child_obj)
- if direction == ONETOMANY:
- child2 = child_class("child2")
- parent_obj.collection.append(child2)
- sess.add(child2)
- elif direction == MANYTOONE:
- parent2 = parent_class("parent2")
- parent2.collection.append(child_obj)
- sess.add(parent2)
- sess.add(somea)
- sess.add(someb)
- sess.add(somec)
- sess.flush()
- sess.expunge_all()
-
- # assert result via direct get() of parent object
- result = sess.query(parent_class).get(parent_obj.id)
- assert result.id == parent_obj.id
- assert result.collection[0].id == child_obj.id
- if direction == ONETOMANY:
- assert result.collection[1].id == child2.id
- elif direction == MANYTOONE:
- result2 = sess.query(parent_class).get(parent2.id)
- assert result2.id == parent2.id
- assert result2.collection[0].id == child_obj.id
-
- sess.expunge_all()
-
- # assert result via polymorphic load of parent object
- result = sess.query(A).filter_by(id=parent_obj.id).one()
- assert result.id == parent_obj.id
- assert result.collection[0].id == child_obj.id
- if direction == ONETOMANY:
- assert result.collection[1].id == child2.id
- elif direction == MANYTOONE:
- result2 = sess.query(A).filter_by(id=parent2.id).one()
- assert result2.id == parent2.id
- assert result2.collection[0].id == child_obj.id
-
- ABCTest.__name__ = "Test%sTo%s%s" % (
- parent,
- child,
- (direction is ONETOMANY and "O2M" or "M2O"),
- )
- return ABCTest
-
-
-# test all combinations of polymorphic a/b/c related to another of a/b/c
-for parent in ["a", "b", "c"]:
- for child in ["a", "b", "c"]:
- for direction in [ONETOMANY, MANYTOONE]:
- testclass = produce_test(parent, child, direction)
- exec("%s = testclass" % testclass.__name__)
- del testclass
-
-del produce_test
+ tc = Table(*tc)
+
+ @classmethod
+ def setup_mappers(cls):
+ parent, child, direction = cls.parent, cls.child, cls.direction
+ ta, tb, tc = cls.tables("a", "b", "c")
+ parent_table = {"a": ta, "b": tb, "c": tc}[parent]
+ child_table = {"a": ta, "b": tb, "c": tc}[child]
+
+ remote_side = None
+
+ if direction == MANYTOONE:
+ foreign_keys = [parent_table.c.child_id]
+ elif direction == ONETOMANY:
+ foreign_keys = [child_table.c.parent_id]
+
+ atob = ta.c.id == tb.c.id
+ btoc = tc.c.id == tb.c.id
+
+ if direction == ONETOMANY:
+ relationshipjoin = parent_table.c.id == child_table.c.parent_id
+ elif direction == MANYTOONE:
+ relationshipjoin = parent_table.c.child_id == child_table.c.id
+ if parent is child:
+ remote_side = [child_table.c.id]
+
+ abcjoin = polymorphic_union(
+ {
+ "a": ta.select()
+ .where(tb.c.id == None) # noqa
+ .select_from(ta.outerjoin(tb, onclause=atob))
+ .subquery(),
+ "b": ta.join(tb, onclause=atob)
+ .outerjoin(tc, onclause=btoc)
+ .select()
+ .where(tc.c.id == None)
+ .reduce_columns()
+ .subquery(), # noqa
+ "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob),
+ },
+ "type",
+ "abcjoin",
+ )
+
+ bcjoin = polymorphic_union(
+ {
+ "b": ta.join(tb, onclause=atob)
+ .outerjoin(tc, onclause=btoc)
+ .select()
+ .where(tc.c.id == None)
+ .reduce_columns()
+ .subquery(), # noqa
+ "c": tc.join(tb, onclause=btoc).join(ta, onclause=atob),
+ },
+ "type",
+ "bcjoin",
+ )
+
+ class A(cls.Comparable):
+ def __init__(self, name):
+ self.a_data = name
+
+ class B(A):
+ pass
+
+ class C(B):
+ pass
+
+ mapper(
+ A,
+ ta,
+ polymorphic_on=abcjoin.c.type,
+ with_polymorphic=("*", abcjoin),
+ polymorphic_identity="a",
+ )
+ mapper(
+ B,
+ tb,
+ polymorphic_on=bcjoin.c.type,
+ with_polymorphic=("*", bcjoin),
+ polymorphic_identity="b",
+ inherits=A,
+ inherit_condition=atob,
+ )
+ mapper(
+ C,
+ tc,
+ polymorphic_identity="c",
+ with_polymorphic=("*", tc.join(tb, btoc).join(ta, atob)),
+ inherits=B,
+ inherit_condition=btoc,
+ )
+
+ parent_mapper = class_mapper({ta: A, tb: B, tc: C}[parent_table])
+ child_mapper = class_mapper({ta: A, tb: B, tc: C}[child_table])
+
+ parent_mapper.add_property(
+ "collection",
+ relationship(
+ child_mapper,
+ primaryjoin=relationshipjoin,
+ foreign_keys=foreign_keys,
+ order_by=child_mapper.c.id,
+ remote_side=remote_side,
+ uselist=True,
+ ),
+ )
+
+ def test_roundtrip(self):
+ parent, child, direction = self.parent, self.child, self.direction
+ A, B, C = self.classes("A", "B", "C")
+ parent_class = {"a": A, "b": B, "c": C}[parent]
+ child_class = {"a": A, "b": B, "c": C}[child]
+
+ sess = create_session()
+
+ parent_obj = parent_class("parent1")
+ child_obj = child_class("child1")
+ somea = A("somea")
+ someb = B("someb")
+ somec = C("somec")
+
+ # print "APPENDING", parent.__class__.__name__ , "TO",
+ # child.__class__.__name__
+
+ sess.add(parent_obj)
+ parent_obj.collection.append(child_obj)
+ if direction == ONETOMANY:
+ child2 = child_class("child2")
+ parent_obj.collection.append(child2)
+ sess.add(child2)
+ elif direction == MANYTOONE:
+ parent2 = parent_class("parent2")
+ parent2.collection.append(child_obj)
+ sess.add(parent2)
+ sess.add(somea)
+ sess.add(someb)
+ sess.add(somec)
+ sess.commit()
+ sess.close()
+
+ # assert result via direct get() of parent object
+ result = sess.get(parent_class, parent_obj.id)
+ assert result.id == parent_obj.id
+ assert result.collection[0].id == child_obj.id
+ if direction == ONETOMANY:
+ assert result.collection[1].id == child2.id
+ elif direction == MANYTOONE:
+ result2 = sess.get(parent_class, parent2.id)
+ assert result2.id == parent2.id
+ assert result2.collection[0].id == child_obj.id
+
+ sess.expunge_all()
+
+ # assert result via polymorphic load of parent object
+ result = sess.query(A).filter_by(id=parent_obj.id).one()
+ assert result.id == parent_obj.id
+ assert result.collection[0].id == child_obj.id
+ if direction == ONETOMANY:
+ assert result.collection[1].id == child2.id
+ elif direction == MANYTOONE:
+ result2 = sess.query(A).filter_by(id=parent2.id).one()
+ assert result2.id == parent2.id
+ assert result2.collection[0].id == child_obj.id
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import testing
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import mapper
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm import column_property
from sqlalchemy.orm import contains_eager
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import join
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import mapper
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session
+from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import with_polymorphic
from sqlalchemy.orm.interfaces import MANYTOONE
from sqlalchemy.testing import AssertsExecutionResults
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
session.flush()
session.expunge_all()
- p = session.query(Person).get(p.person_id)
- m = session.query(Manager).get(m.person_id)
+ p = session.get(Person, p.person_id)
+ m = session.get(Manager, m.person_id)
assert p.manager is m
def test_descendant_refs_parent(self):
session.flush()
session.expunge_all()
- p = session.query(Person).get(p.person_id)
- m = session.query(Manager).get(m.person_id)
+ p = session.get(Person, p.person_id)
+ m = session.get(Manager, m.person_id)
assert m.employee is p
if jointype == "join1":
poly_union = polymorphic_union(
{
- "person": people.select(
- people.c.type == "person"
- ).subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
"manager": join(
people,
managers,
elif jointype == "join2":
poly_union = polymorphic_union(
{
- "person": people.select(
- people.c.type == "person"
- ).subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
"manager": managers.join(
people, people.c.person_id == managers.c.person_id
),
sess.flush()
sess.expunge_all()
- p = sess.query(Person).get(p.person_id)
- m = sess.query(Manager).get(m.person_id)
+ p = sess.get(Person, p.person_id)
+ m = sess.get(Manager, m.person_id)
assert m.colleague is p
if usedata:
assert m.data.data == "ms data"
"manager": managers.join(
people, people.c.person_id == managers.c.person_id
),
- "person": people.select(
- people.c.type == "person"
- ).subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
},
None,
)
managers,
people.c.person_id == managers.c.person_id,
),
- "person": people.select(
- people.c.type == "person"
- ).subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
},
None,
)
sess.flush()
sess.expunge_all()
- p = sess.query(Person).get(p.person_id)
- p2 = sess.query(Person).get(p2.person_id)
- p3 = sess.query(Person).get(p3.person_id)
- m = sess.query(Person).get(m.person_id)
+ p = sess.get(Person, p.person_id)
+ p2 = sess.get(Person, p2.person_id)
+ p3 = sess.get(Person, p3.person_id)
+ m = sess.get(Person, m.person_id)
assert len(p.colleagues) == 1
assert p.colleagues == [p2]
assert m.colleagues == [p3]
session.expunge_all()
def go():
- testcar = (
- session.query(Car)
- .options(joinedload("employee"))
- .get(car1.car_id)
+ testcar = session.get(
+ Car, car1.car_id, options=[joinedload("employee")]
)
assert str(testcar.employee) == "Engineer E4, status X"
self.assert_sql_count(testing.db, go, 1)
- car1 = session.query(Car).get(car1.car_id)
- usingGet = session.query(person_mapper).get(car1.owner)
+ car1 = session.get(Car, car1.car_id)
+ usingGet = session.get(person_mapper, car1.owner)
usingProperty = car1.employee
assert str(engineer4) == "Engineer E4, status X"
# and now for the lightning round, eager !
def go():
- testcar = (
- session.query(Car)
- .options(joinedload("employee"))
- .get(car1.car_id)
+ testcar = session.get(
+ Car, car1.car_id, options=[joinedload("employee")],
)
assert str(testcar.employee) == "Engineer E4, status X"
sess.flush()
sess.expunge_all()
- m = sess.query(Manager).get(m.person_id)
- m2 = sess.query(Manager).get(m2.person_id)
+ m = sess.get(Manager, m.person_id)
+ m2 = sess.get(Manager, m2.person_id)
assert m.colleague is m2
car_join = polymorphic_union(
{
"car": cars.outerjoin(offroad_cars)
- .select(offroad_cars.c.car_id == None)
+ .select()
+ .where(offroad_cars.c.car_id == None)
.reduce_columns()
.subquery(),
"offroad": cars.join(offroad_cars),
Status, Person, Engineer, Manager, Car = cls.classes(
"Status", "Person", "Engineer", "Manager", "Car"
)
- session = create_session(connection)
+ with sessionmaker(connection).begin() as session:
- active = Status(name="active")
- dead = Status(name="dead")
+ active = Status(name="active")
+ dead = Status(name="dead")
- session.add(active)
- session.add(dead)
- session.flush()
-
- # TODO: we haven't created assertions for all
- # the data combinations created here
+ session.add(active)
+ session.add(dead)
- # creating 5 managers named from M1 to M5
- # and 5 engineers named from E1 to E5
- # M4, M5, E4 and E5 are dead
- for i in range(1, 5):
- if i < 4:
- st = active
- else:
- st = dead
- session.add(
- Manager(name="M%d" % i, category="YYYYYYYYY", status=st)
- )
- session.add(Engineer(name="E%d" % i, field="X", status=st))
+ # TODO: we haven't created assertions for all
+ # the data combinations created here
- session.flush()
+ # creating 5 managers named from M1 to M5
+ # and 5 engineers named from E1 to E5
+ # M4, M5, E4 and E5 are dead
+ for i in range(1, 5):
+ if i < 4:
+ st = active
+ else:
+ st = dead
+ session.add(
+ Manager(name="M%d" % i, category="YYYYYYYYY", status=st)
+ )
+ session.add(Engineer(name="E%d" % i, field="X", status=st))
- # get E4
- engineer4 = session.query(Engineer).filter_by(name="E4").one()
+ # get E4
+ engineer4 = session.query(Engineer).filter_by(name="E4").one()
- # create 2 cars for E4, one active and one dead
- car1 = Car(employee=engineer4, status=active)
- car2 = Car(employee=engineer4, status=dead)
- session.add(car1)
- session.add(car2)
- session.flush()
+ # create 2 cars for E4, one active and one dead
+ car1 = Car(employee=engineer4, status=active)
+ car2 = Car(employee=engineer4, status=dead)
+ session.add(car1)
+ session.add(car2)
def test_join_to_q_person(self):
Status, Person, Engineer, Manager, Car = self.classes(
)
session = create_session()
r = session.query(Person).filter(
- exists([1], Car.owner == Person.person_id)
+ exists().where(Car.owner == Person.person_id)
)
eq_(
.where(table_Employee.c.atype == "Engineer")
.select_from(table_Employee.join(table_Engineer))
.subquery(),
- "Employee": table_Employee.select(
- table_Employee.c.atype == "Employee"
- ).subquery(),
+ "Employee": table_Employee.select()
+ .where(table_Employee.c.atype == "Employee")
+ .subquery(),
},
None,
"pu_employee",
item_join = polymorphic_union(
{
- "BaseItem": base_item_table.select(
- base_item_table.c.child_name == "BaseItem"
- ).subquery(),
+ "BaseItem": base_item_table.select()
+ .where(base_item_table.c.child_name == "BaseItem")
+ .subquery(),
"Item": base_item_table.join(item_table),
},
None,
# a 2-col pk in any case but the leading select has a NULL for the
# "t2id" column
d = util.OrderedDict()
- d["t1"] = t1.select(t1.c.type == "t1").subquery()
+ d["t1"] = t1.select().where(t1.c.type == "t1").subquery()
d["t2"] = t1.join(t2)
pjoin = polymorphic_union(d, None, "pjoin")
# query using get(), using only one value.
# this requires the select_table mapper
# has the same single-col primary key.
- assert sess.query(T1).get(ot1.id).id == ot1.id
+ assert sess.get(T1, ot1.id).id == ot1.id
- ot1 = sess.query(T1).get(ot1.id)
+ ot1 = sess.get(T1, ot1.id)
ot1.data = "hi"
sess.flush()
# a 2-col pk in any case but the leading select has a NULL for the
# "t2id" column
d = util.OrderedDict()
- d["t1"] = t1.select(t1.c.type == "t1").subquery()
+ d["t1"] = t1.select().where(t1.c.type == "t1").subquery()
d["t2"] = t1.join(t2)
pjoin = polymorphic_union(d, None, "pjoin")
# query using get(), using only one value. this requires the
# select_table mapper
# has the same single-col primary key.
- assert sess.query(T1).get(ot1.id).id == ot1.id
+ assert sess.get(T1, ot1.id).id == ot1.id
- ot1 = sess.query(T1).get(ot1.id)
+ ot1 = sess.get(T1, ot1.id)
ot1.data = "hi"
sess.flush()
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import column_property
from sqlalchemy.orm import composite
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import deferred
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.orm import joinedload
from sqlalchemy.testing.assertsql import Conditional
from sqlalchemy.testing.assertsql import Or
from sqlalchemy.testing.assertsql import RegexSQL
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
child_id = Column(Integer, ForeignKey("a.id"))
child = relationship("A")
- p_a = case([(discriminator == "a", "a")], else_="b")
+ p_a = case((discriminator == "a", "a"), else_="b")
__mapper_args__ = {
"polymorphic_identity": "a",
def test_polymorphic_on_expr_explicit_map(self):
t2, t1 = self.tables.t2, self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child"))
mapper(
Parent,
t1,
def test_polymorphic_on_expr_implicit_map_no_label_joined(self):
t2, t1 = self.tables.t2, self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child"))
mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr)
mapper(Child, t2, inherits=Parent, polymorphic_identity="child")
def test_polymorphic_on_expr_implicit_map_w_label_joined(self):
t2, t1 = self.tables.t2, self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case(
- [(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")]
- ).label(None)
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")).label(
+ None
+ )
mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr)
mapper(Child, t2, inherits=Parent, polymorphic_identity="child")
with a standalone expr"""
t1 = self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child"))
mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr)
mapper(Child, inherits=Parent, polymorphic_identity="child")
with a standalone expr"""
t1 = self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case(
- [(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")]
- ).label(None)
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child")).label(
+ None
+ )
mapper(Parent, t1, polymorphic_identity="parent", polymorphic_on=expr)
mapper(Child, inherits=Parent, polymorphic_identity="child")
def test_polymorphic_on_column_prop(self):
t2, t1 = self.tables.t2, self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child"))
cprop = column_property(expr)
mapper(
Parent,
def test_polymorphic_on_column_str_prop(self):
t2, t1 = self.tables.t2, self.tables.t1
Parent, Child = self.classes.Parent, self.classes.Child
- expr = case([(t1.c.x == "p", "parent"), (t1.c.x == "c", "child")])
+ expr = case((t1.c.x == "p", "parent"), (t1.c.x == "c", "child"))
cprop = column_property(expr)
mapper(
Parent,
class Blub(Bar):
pass
- def test_get_polymorphic(self):
- self._do_get_test(True)
-
- def test_get_nonpolymorphic(self):
- self._do_get_test(False)
-
- def _do_get_test(self, polymorphic):
+ @testing.combinations(
+ ("polymorphic", True), ("test_get_nonpolymorphic", False), id_="ia"
+ )
+ def test_get(self, polymorphic):
foo, Bar, Blub, blub, bar, Foo = (
self.tables.foo,
self.classes.Bar,
if polymorphic:
def go():
- assert sess.query(Foo).get(f.id) is f
- assert sess.query(Foo).get(b.id) is b
- assert sess.query(Foo).get(bl.id) is bl
- assert sess.query(Bar).get(b.id) is b
- assert sess.query(Bar).get(bl.id) is bl
- assert sess.query(Blub).get(bl.id) is bl
+ assert sess.get(Foo, f.id) is f
+ assert sess.get(Foo, b.id) is b
+ assert sess.get(Foo, bl.id) is bl
+ assert sess.get(Bar, b.id) is b
+ assert sess.get(Bar, bl.id) is bl
+ assert sess.get(Blub, bl.id) is bl
# test class mismatches - item is present
# in the identity map but we requested a subclass
- assert sess.query(Blub).get(f.id) is None
- assert sess.query(Blub).get(b.id) is None
- assert sess.query(Bar).get(f.id) is None
+ assert sess.get(Blub, f.id) is None
+ assert sess.get(Blub, b.id) is None
+ assert sess.get(Bar, f.id) is None
self.assert_sql_count(testing.db, go, 0)
else:
# polymorphic. the important part being that get() always
# returns an instance of the query's type.
def go():
- assert sess.query(Foo).get(f.id) is f
+ assert sess.get(Foo, f.id) is f
- bb = sess.query(Foo).get(b.id)
+ bb = sess.get(Foo, b.id)
assert isinstance(b, Foo) and bb.id == b.id
- bll = sess.query(Foo).get(bl.id)
+ bll = sess.get(Foo, bl.id)
assert isinstance(bll, Foo) and bll.id == bl.id
- assert sess.query(Bar).get(b.id) is b
+ assert sess.get(Bar, b.id) is b
- bll = sess.query(Bar).get(bl.id)
+ bll = sess.get(Bar, bl.id)
assert isinstance(bll, Bar) and bll.id == bl.id
- assert sess.query(Blub).get(bl.id) is bl
+ assert sess.get(Blub, bl.id) is bl
self.assert_sql_count(testing.db, go, 3)
@classmethod
def define_tables(cls, metadata):
- global foo, bar, bar_foo
- foo = Table(
+ Table(
"foo",
metadata,
Column(
),
Column("data", String(30)),
)
- bar = Table(
+ Table(
"bar",
metadata,
Column("id", Integer, ForeignKey("foo.id"), primary_key=True),
Column("bar_data", String(30)),
)
- bar_foo = Table(
+ Table(
"bar_foo",
metadata,
Column("bar_id", Integer, ForeignKey("bar.id")),
Column("foo_id", Integer, ForeignKey("foo.id")),
)
- def test_basic(self):
- class Foo(object):
+ @classmethod
+ def setup_mappers(cls):
+ foo, bar, bar_foo = cls.tables("foo", "bar", "bar_foo")
+
+ class Foo(cls.Comparable):
pass
class Bar(Foo):
"eager", relationship(foos, bar_foo, lazy="joined", viewonly=True)
)
- foo.insert().execute(data="foo1")
- bar.insert().execute(id=1, data="bar1")
+ @classmethod
+ def insert_data(cls, connection):
+ foo, bar, bar_foo = cls.tables("foo", "bar", "bar_foo")
+
+ connection.execute(foo.insert(), dict(data="foo1"))
+ connection.execute(bar.insert(), dict(id=1, data="bar1"))
+
+ connection.execute(foo.insert(), dict(data="foo2"))
+ connection.execute(bar.insert(), dict(id=2, data="bar2"))
- foo.insert().execute(data="foo2")
- bar.insert().execute(id=2, data="bar2")
+ connection.execute(foo.insert(), dict(data="foo3")) # 3
+ connection.execute(foo.insert(), dict(data="foo4")) # 4
- foo.insert().execute(data="foo3") # 3
- foo.insert().execute(data="foo4") # 4
+ connection.execute(bar_foo.insert(), dict(bar_id=1, foo_id=3))
+ connection.execute(bar_foo.insert(), dict(bar_id=2, foo_id=4))
- bar_foo.insert().execute(bar_id=1, foo_id=3)
- bar_foo.insert().execute(bar_id=2, foo_id=4)
+ def test_basic(self):
+ Bar = self.classes.Bar
sess = create_session()
q = sess.query(Bar)
sess.add(a)
sess.flush()
- eq_(select(func.count("*")).select_from(user_roles).scalar(), 1)
+ eq_(sess.scalar(select(func.count("*")).select_from(user_roles)), 1)
def test_two(self):
admins, users, roles, user_roles = (
a.password = "sadmin"
sess.flush()
- eq_(select(func.count("*")).select_from(user_roles).scalar(), 1)
+ eq_(sess.scalar(select(func.count("*")).select_from(user_roles)), 1)
class PassiveDeletesTest(fixtures.MappedTest):
def _do_test(self, composite):
session = create_session()
- query = session.query(Employee)
if composite:
- alice1 = query.get([1, 2])
- bob = query.get([2, 3])
- alice2 = query.get([1, 2])
+ alice1 = session.get(Employee, [1, 2])
+ bob = session.get(Employee, [2, 3])
+ alice2 = session.get(Employee, [1, 2])
else:
- alice1 = query.get(1)
- bob = query.get(2)
- alice2 = query.get(1)
+ alice1 = session.get(Employee, 1)
+ bob = session.get(Employee, 2)
+ alice2 = session.get(Employee, 1)
assert alice1.name == alice2.name == "alice"
assert bob.name == "bob"
Column("data3", String(128)),
)
- def test_joins(self):
- for j1 in (
- None,
- _b_table.c.a_id == _a_table.c.id,
- _a_table.c.id == _b_table.c.a_id,
- ):
- for j2 in (
- None,
- _b_table.c.a_id == _c_table.c.b_a_id,
- _c_table.c.b_a_id == _b_table.c.a_id,
- ):
- self._do_test(j1, j2)
- for t in reversed(_a_table.metadata.sorted_tables):
- t.delete().execute().close()
-
- def _do_test(self, j1, j2):
+ @testing.combinations(
+ lambda _a_table, _b_table: None,
+ lambda _a_table, _b_table: _b_table.c.a_id == _a_table.c.id,
+ lambda _a_table, _b_table: _a_table.c.id == _b_table.c.a_id,
+ argnames="j1",
+ )
+ @testing.combinations(
+ lambda _b_table, _c_table: None,
+ lambda _b_table, _c_table: _b_table.c.a_id == _c_table.c.b_a_id,
+ lambda _b_table, _c_table: _c_table.c.b_a_id == _b_table.c.a_id,
+ argnames="j2",
+ )
+ def test_joins(self, j1, j2):
+ _a_table, _b_table, _c_table = self.tables("a", "b", "c")
+ j1 = testing.resolve_lambda(j1, **locals())
+ j2 = testing.resolve_lambda(j2, **locals())
+
class A(object):
def __init__(self, **kwargs):
for key, value in list(kwargs.items()):
sess = create_session()
sess.add(s1)
sess.flush()
- assert sess.query(Sub).get(10) is s1
+ assert sess.get(Sub, 10) is s1
def test_override_onlyinparent(self):
class Base(object):
sess.flush()
# s1 gets '10'
- assert sess.query(Sub).get(10) is s1
+ assert sess.get(Sub, 10) is s1
# s2 gets a new id, base_id is overwritten by the ultimate
# PK col
sess.flush()
sess.expunge_all()
- assert sess.query(Base).get(b1.base_id).subdata == "this is base"
- assert sess.query(Sub).get(s1.base_id).subdata == "this is sub"
+ assert sess.get(Base, b1.base_id).subdata == "this is base"
+ assert sess.get(Sub, s1.base_id).subdata == "this is sub"
def test_base_descriptors_over_base_cols(self):
class Base(object):
sess.flush()
sess.expunge_all()
- assert sess.query(Base).get(b1.base_id).data == "this is base"
- assert sess.query(Sub).get(s1.base_id).data == "this is base"
+ assert sess.get(Base, b1.base_id).data == "this is base"
+ assert sess.get(Sub, s1.base_id).data == "this is base"
class OptimizedLoadTest(fixtures.MappedTest):
polymorphic_identity="sub",
with_polymorphic=(
"*",
- base.outerjoin(sub).select(use_labels=True).alias("foo"),
+ base.outerjoin(sub).select().apply_labels().alias("foo"),
),
)
sess = Session()
sess.flush()
f_id = f.id
sess.expunge_all()
- assert sess.query(Content).get(f_id).content_type == "bar"
+ assert sess.get(Content, f_id).content_type == "bar"
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import configure_mappers
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import mapper
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
eq_(test_calls.mock_calls, [mock.call.engineer_info_instance()])
session.add(tom)
- session.flush()
+ session.commit()
session.close()
assert (
len(
- testing.db.execute(
- session.query(Employee).with_labels().statement
- ).fetchall()
+ session.connection()
+ .execute(session.query(Employee).with_labels().statement)
+ .fetchall()
)
== 3
)
session.flush()
eq_(
len(
- testing.db.execute(
+ session.connection()
+ .execute(
session.query(Employee)
.with_polymorphic("*", pjoin, pjoin.c.type)
.with_labels()
.statement
- ).fetchall()
+ )
+ .fetchall()
),
4,
)
- eq_(session.query(Employee).get(jdoe.employee_id), jdoe)
- eq_(session.query(Engineer).get(jerry.employee_id), jerry)
+ eq_(session.get(Employee, jdoe.employee_id), jdoe)
+ eq_(session.get(Engineer, jerry.employee_id), jerry)
eq_(
set(
[
# test adaption of the column by wrapping the query in a
# subquery
- eq_(
- len(
- testing.db.execute(
- session.query(Engineer)
- .with_polymorphic("*", pjoin2, pjoin2.c.type)
- .from_self()
- .statement
- ).fetchall()
- ),
- 2,
- )
- eq_(
- set(
- [
- repr(x)
- for x in session.query(Engineer)
- .with_polymorphic("*", pjoin2, pjoin2.c.type)
- .from_self()
- ]
- ),
- set(
- [
- "Engineer Jerry knows how to program",
- "Hacker Kurt 'Badass' knows how to hack",
- ]
- ),
- )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ eq_(
+ len(
+ session.connection()
+ .execute(
+ session.query(Engineer)
+ .with_polymorphic("*", pjoin2, pjoin2.c.type)
+ .from_self()
+ .statement
+ )
+ .fetchall()
+ ),
+ 2,
+ )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ eq_(
+ set(
+ [
+ repr(x)
+ for x in session.query(Engineer)
+ .with_polymorphic("*", pjoin2, pjoin2.c.type)
+ .from_self()
+ ]
+ ),
+ set(
+ [
+ "Engineer Jerry knows how to program",
+ "Hacker Kurt 'Badass' knows how to hack",
+ ]
+ ),
+ )
def test_relationship(self):
pjoin = polymorphic_union(
session.expunge_all()
def go():
- c2 = session.query(Company).get(c.id)
+ c2 = session.get(Company, c.id)
assert set([repr(x) for x in c2.employees]) == set(
[
"Engineer Kurt knows how to hack",
session.expunge_all()
def go():
- c2 = (
- session.query(Company)
- .options(joinedload(Company.employees))
- .get(c.id)
+ c2 = session.get(
+ Company, c.id, options=[joinedload(Company.employees)]
)
assert set([repr(x) for x in c2.employees]) == set(
[
def insert_data(cls, connection):
connection.execute(
refugees_table.insert(),
- dict(refugee_fid=1, name="refugee1"),
- dict(refugee_fid=2, name="refugee2"),
+ [
+ dict(refugee_fid=1, name="refugee1"),
+ dict(refugee_fid=2, name="refugee2"),
+ ],
)
connection.execute(
offices_table.insert(),
- dict(office_fid=1, name="office1"),
- dict(office_fid=2, name="office2"),
+ [
+ dict(office_fid=1, name="office1"),
+ dict(office_fid=2, name="office2"),
+ ],
)
def test_keys(self):
polymorphic_identity="refugee",
)
sess = create_session()
- eq_(sess.query(Refugee).get(1).name, "refugee1")
- eq_(sess.query(Refugee).get(2).name, "refugee2")
- eq_(sess.query(Office).get(1).name, "office1")
- eq_(sess.query(Office).get(2).name, "office2")
+ eq_(sess.get(Refugee, 1).name, "refugee1")
+ eq_(sess.get(Refugee, 2).name, "refugee2")
+ eq_(sess.get(Office, 1).name, "office1")
+ eq_(sess.get(Office, 2).name, "office2")
"c": self.tables.page.join(self.tables.magazine_page).join(
self.tables.classified_page
),
- "p": self.tables.page.select(
- self.tables.page.c.type == "p"
- ).subquery(),
+ "p": self.tables.page.select()
+ .where(self.tables.page.c.type == "p")
+ .subquery(),
},
None,
"page_join",
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.orm import class_mapper
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
class InheritTest(fixtures.MappedTest):
# test that "bar.bid" does not need to be referenced in a get
# (ticket 185)
- assert sess.query(Bar).get(b.id).id == b.id
+ assert sess.get(Bar, b.id).id == b.id
def test_basic(self):
class Foo(object):
from sqlalchemy.orm import backref
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import configure_mappers
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import mapper
from sqlalchemy.orm import polymorphic_union
from sqlalchemy.orm import relationship
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
{
"engineer": people.join(engineers),
"manager": people.join(managers),
- "person": people.select(people.c.type == "person").subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
},
None,
"pjoin",
session.add(c)
session.flush()
session.expunge_all()
- eq_(session.query(Company).get(c.company_id), c)
+ eq_(session.get(Company, c.company_id), c)
@testing.combinations(
{
"engineer": people.join(engineers),
"manager": people.join(managers),
- "person": people.select(
- people.c.type == "person"
- ).subquery(),
+ "person": people.select()
+ .where(people.c.type == "person")
+ .subquery(),
},
None,
"pjoin",
employees = session.query(Person).order_by(Person.person_id).all()
company = session.query(Company).first()
- eq_(session.query(Person).get(dilbert.person_id), dilbert)
+ eq_(session.get(Person, dilbert.person_id), dilbert)
session.expunge_all()
eq_(
session.expunge_all()
def go():
- cc = session.query(Company).get(company.company_id)
+ cc = session.get(Company, company.company_id)
eq_(cc.employees, employees)
if not lazy_relationship:
# the "palias" alias does *not* get sucked up
# into the "person_join" conversion.
palias = people.alias("palias")
- dilbert = session.query(Person).get(dilbert.person_id)
+ dilbert = session.get(Person, dilbert.person_id)
is_(
dilbert,
session.query(Person)
from sqlalchemy import testing
from sqlalchemy import true
from sqlalchemy.orm import aliased
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import defaultload
from sqlalchemy.orm import join
from sqlalchemy.orm import joinedload
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import eq_
from sqlalchemy.testing.assertsql import CompiledSQL
+from sqlalchemy.testing.fixtures import create_session
from ._poly_fixtures import _Polymorphic
from ._poly_fixtures import _PolymorphicAliasedJoins
from ._poly_fixtures import _PolymorphicJoins
self.assert_sql_count(testing.db, go, 3)
eq_(
- select(func.count("*"))
- .select_from(
- sess.query(Person)
- .with_polymorphic("*")
- .options(joinedload(Engineer.machines))
- .order_by(Person.person_id)
- .limit(2)
- .offset(1)
- .with_labels()
- .subquery()
- )
- .scalar(),
+ sess.scalar(
+ select(func.count("*")).select_from(
+ sess.query(Person)
+ .with_polymorphic("*")
+ .options(joinedload(Engineer.machines))
+ .order_by(Person.person_id)
+ .limit(2)
+ .offset(1)
+ .with_labels()
+ .subquery()
+ )
+ ),
2,
)
"""
sess = create_session()
eq_(
- sess.query(Person).get(e1.person_id),
+ sess.get(Person, e1.person_id),
Engineer(name="dilbert", primary_language="java"),
)
def test_get_two(self):
sess = create_session()
eq_(
- sess.query(Engineer).get(e1.person_id),
+ sess.get(Engineer, e1.person_id),
Engineer(name="dilbert", primary_language="java"),
)
def test_get_three(self):
sess = create_session()
eq_(
- sess.query(Manager).get(b1.person_id),
+ sess.get(Manager, b1.person_id),
Boss(name="pointy haired boss", golf_swing="fore"),
)
)
def test_multi_join_future(self):
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
e = aliased(Person)
c = aliased(Company)
eq_(sess.query(Engineer).all()[0], Engineer(name="dilbert"))
def test_filter_on_subclass_one_future(self):
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
eq_(
sess.execute(select(Engineer)).scalar(), Engineer(name="dilbert"),
)
)
def test_join_from_polymorphic_nonaliased_one_future(self):
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
eq_(
sess.execute(
select(Person)
)
def test_join_from_polymorphic_flag_aliased_one_future(self):
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
pa = aliased(Paperwork)
eq_(
)
def test_join_from_with_polymorphic_nonaliased_one_future(self):
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
pm = with_polymorphic(Person, [Manager])
eq_(
palias = aliased(Person)
expected = [(m1, e1), (m1, e2), (m1, b1)]
- eq_(
- sess.query(Person, palias)
- .filter(Person.company_id == palias.company_id)
- .filter(Person.name == "dogbert")
- .filter(Person.person_id > palias.person_id)
- .from_self()
- .order_by(Person.person_id, palias.person_id)
- .all(),
- expected,
- )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ eq_(
+ sess.query(Person, palias)
+ .filter(Person.company_id == palias.company_id)
+ .filter(Person.name == "dogbert")
+ .filter(Person.person_id > palias.person_id)
+ .from_self()
+ .order_by(Person.person_id, palias.person_id)
+ .all(),
+ expected,
+ )
def test_self_referential_two_point_five(self):
"""Using two aliases, the above case works.
expected = [(m1, e1), (m1, e2), (m1, b1)]
- eq_(
- sess.query(palias, palias2)
- .filter(palias.company_id == palias2.company_id)
- .filter(palias.name == "dogbert")
- .filter(palias.person_id > palias2.person_id)
- .from_self()
- .order_by(palias.person_id, palias2.person_id)
- .all(),
- expected,
- )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ eq_(
+ sess.query(palias, palias2)
+ .filter(palias.company_id == palias2.company_id)
+ .filter(palias.name == "dogbert")
+ .filter(palias.person_id > palias2.person_id)
+ .from_self()
+ .order_by(palias.person_id, palias2.person_id)
+ .all(),
+ expected,
+ )
def test_self_referential_two_future(self):
# TODO: this is the SECOND test *EVER* of an aliased class of
# an aliased class.
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
expected = [(m1, e1), (m1, e2), (m1, b1)]
# not aliasing the first class
# TODO: this is the first test *EVER* of an aliased class of
# an aliased class. we should add many more tests for this.
# new case added in Id810f485c5f7ed971529489b84694e02a3356d6d
- sess = create_session(testing.db, future=True)
+ sess = create_session(future=True)
expected = [(m1, e1), (m1, e2), (m1, b1)]
# aliasing the first class
from sqlalchemy import LargeBinary
from sqlalchemy import String
from sqlalchemy.orm import backref
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import deferred
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.orm import backref
from sqlalchemy.orm import configure_mappers
from sqlalchemy.orm import contains_eager
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
+from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import subqueryload
from sqlalchemy.orm import with_polymorphic
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing.entities import ComparableEntity
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
e4 = Engineer(name="e4")
org1 = Organization(name="org1", engineers=[e1, e2])
org2 = Organization(name="org2", engineers=[e3, e4])
- sess = create_session(connection)
- sess.add(org1)
- sess.add(org2)
- sess.flush()
+ with sessionmaker(connection).begin() as sess:
+ sess.add(org1)
+ sess.add(org2)
def test_not_contains(self):
Organization = self.classes.Organization
# another way to check
eq_(
- select(func.count("*"))
- .select_from(q.limit(1).with_labels().subquery())
- .scalar(),
+ sess.scalar(
+ select(func.count("*")).select_from(
+ q.limit(1).with_labels().subquery()
+ )
+ ),
1,
)
assert q.first() is c1
global p1, p2
Sub, Subparent = cls.classes.Sub, cls.classes.Subparent
- sess = create_session(connection)
- p1 = Subparent(
- data="p1",
- children=[Sub(data="s1"), Sub(data="s2"), Sub(data="s3")],
- )
- p2 = Subparent(data="p2", children=[Sub(data="s4"), Sub(data="s5")])
- sess.add(p1)
- sess.add(p2)
- sess.flush()
+ with sessionmaker(connection).begin() as sess:
+ p1 = Subparent(
+ data="p1",
+ children=[Sub(data="s1"), Sub(data="s2"), Sub(data="s3")],
+ )
+ p2 = Subparent(
+ data="p2", children=[Sub(data="s4"), Sub(data="s5")]
+ )
+ sess.add(p1)
+ sess.add(p2)
def test_joinedload(self):
Subparent = self.classes.Subparent
"JOIN ep2 ON base2.id = ep2.base2_id",
)
- def test_six(self):
+ def test_six_legacy(self):
Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes()
s = Session()
# this query is coming out instead which is equivalent, but not
# totally sure where this happens
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ self.assert_compile(
+ s.query(Sub2).from_self().join(Sub2.ep1).join(Sub2.ep2),
+ "SELECT anon_1.sub2_id AS anon_1_sub2_id, "
+ "anon_1.base2_base1_id AS anon_1_base2_base1_id, "
+ "anon_1.base2_data AS anon_1_base2_data, "
+ "anon_1.sub2_subdata AS anon_1_sub2_subdata "
+ "FROM (SELECT sub2.id AS sub2_id, base2.id AS base2_id, "
+ "base2.base1_id AS base2_base1_id, base2.data AS base2_data, "
+ "sub2.subdata AS sub2_subdata "
+ "FROM base2 JOIN sub2 ON base2.id = sub2.id) AS anon_1 "
+ "JOIN ep1 ON anon_1.sub2_id = ep1.base2_id "
+ "JOIN ep2 ON anon_1.sub2_id = ep2.base2_id",
+ )
+
+ def test_six(self):
+ Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes()
+
+ # as of from_self() changing in
+ # I3abfb45dd6e50f84f29d39434caa0b550ce27864,
+ # this query is coming out instead which is equivalent, but not
+ # totally sure where this happens
+
+ stmt = select(Sub2)
+
+ subq = aliased(Sub2, stmt.apply_labels().subquery())
+
+ stmt = select(subq).join(subq.ep1).join(Sub2.ep2).apply_labels()
self.assert_compile(
- s.query(Sub2).from_self().join(Sub2.ep1).join(Sub2.ep2),
+ stmt,
"SELECT anon_1.sub2_id AS anon_1_sub2_id, "
"anon_1.base2_base1_id AS anon_1_base2_base1_id, "
"anon_1.base2_data AS anon_1_base2_data, "
"JOIN ep2 ON anon_1.sub2_id = ep2.base2_id",
)
- def test_seven(self):
+ def test_seven_legacy(self):
Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes()
s = Session()
# I3abfb45dd6e50f84f29d39434caa0b550ce27864,
# this query is coming out instead which is equivalent, but not
# totally sure where this happens
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+
+ self.assert_compile(
+ # adding Sub2 to the entities list helps it,
+ # otherwise the joins for Sub2.ep1/ep2 don't have columns
+ # to latch onto. Can't really make it better than this
+ s.query(Parent, Sub2)
+ .join(Parent.sub1)
+ .join(Sub1.sub2)
+ .from_self()
+ .join(Sub2.ep1)
+ .join(Sub2.ep2),
+ "SELECT anon_1.parent_id AS anon_1_parent_id, "
+ "anon_1.parent_data AS anon_1_parent_data, "
+ "anon_1.sub2_id AS anon_1_sub2_id, "
+ "anon_1.base2_base1_id AS anon_1_base2_base1_id, "
+ "anon_1.base2_data AS anon_1_base2_data, "
+ "anon_1.sub2_subdata AS anon_1_sub2_subdata "
+ "FROM (SELECT parent.id AS parent_id, "
+ "parent.data AS parent_data, "
+ "sub2.id AS sub2_id, "
+ "base2.id AS base2_id, "
+ "base2.base1_id AS base2_base1_id, "
+ "base2.data AS base2_data, "
+ "sub2.subdata AS sub2_subdata "
+ "FROM parent JOIN (base1 JOIN sub1 ON base1.id = sub1.id) "
+ "ON parent.id = sub1.parent_id JOIN "
+ "(base2 JOIN sub2 ON base2.id = sub2.id) "
+ "ON base1.id = base2.base1_id) AS anon_1 "
+ "JOIN ep1 ON anon_1.sub2_id = ep1.base2_id "
+ "JOIN ep2 ON anon_1.sub2_id = ep2.base2_id",
+ )
+
+ def test_seven(self):
+ Parent, Base1, Base2, Sub1, Sub2, EP1, EP2 = self._classes()
+
+ # as of from_self() changing in
+ # I3abfb45dd6e50f84f29d39434caa0b550ce27864,
+ # this query is coming out instead which is equivalent, but not
+ # totally sure where this happens
+
+ subq = (
+ select(Parent, Sub2)
+ .join(Parent.sub1)
+ .join(Sub1.sub2)
+ .apply_labels()
+ .subquery()
+ )
+
+ # another 1.4 supercharged select() statement ;)
+
+ palias = aliased(Parent, subq)
+ sub2alias = aliased(Sub2, subq)
+
+ stmt = (
+ select(palias, sub2alias)
+ .join(sub2alias.ep1)
+ .join(sub2alias.ep2)
+ .apply_labels()
+ )
+
self.assert_compile(
# adding Sub2 to the entities list helps it,
# otherwise the joins for Sub2.ep1/ep2 don't have columns
# to latch onto. Can't really make it better than this
- s.query(Parent, Sub2)
- .join(Parent.sub1)
- .join(Sub1.sub2)
- .from_self()
- .join(Sub2.ep1)
- .join(Sub2.ep2),
+ stmt,
"SELECT anon_1.parent_id AS anon_1_parent_id, "
"anon_1.parent_data AS anon_1_parent_data, "
"anon_1.sub2_id AS anon_1_sub2_id, "
from sqlalchemy import true
from sqlalchemy.orm import aliased
from sqlalchemy.orm import Bundle
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertsql import CompiledSQL
+from sqlalchemy.testing.fixtures import create_session
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
[e2id],
)
- def test_from_self(self):
+ def test_from_self_legacy(self):
Engineer = self.classes.Engineer
sess = create_session()
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ self.assert_compile(
+ sess.query(Engineer).from_self(),
+ "SELECT anon_1.employees_employee_id AS "
+ "anon_1_employees_employee_id, "
+ "anon_1.employees_name AS "
+ "anon_1_employees_name, "
+ "anon_1.employees_manager_data AS "
+ "anon_1_employees_manager_data, "
+ "anon_1.employees_engineer_info AS "
+ "anon_1_employees_engineer_info, "
+ "anon_1.employees_type AS "
+ "anon_1_employees_type FROM (SELECT "
+ "employees.employee_id AS "
+ "employees_employee_id, employees.name AS "
+ "employees_name, employees.manager_data AS "
+ "employees_manager_data, "
+ "employees.engineer_info AS "
+ "employees_engineer_info, employees.type "
+ "AS employees_type FROM employees WHERE "
+ "employees.type IN ([POSTCOMPILE_type_1])) AS "
+ "anon_1",
+ use_default_dialect=True,
+ )
+
+ def test_from_subq(self):
+ Engineer = self.classes.Engineer
+
+ stmt = select(Engineer)
+ subq = aliased(Engineer, stmt.apply_labels().subquery())
+
+ # so here we have an extra "WHERE type in ()", because
+ # both the inner and the outer queries have the Engineer entity.
+ # this is expected at the moment but it would be nice if
+ # _enable_single_crit or something similar could propagate here.
+ # legacy from_self() takes care of this because it applies
+ # _enable_single_crit at that moment.
+
+ stmt = select(subq).apply_labels()
self.assert_compile(
- sess.query(Engineer).from_self(),
+ stmt,
"SELECT anon_1.employees_employee_id AS "
"anon_1_employees_employee_id, "
"anon_1.employees_name AS "
"employees_engineer_info, employees.type "
"AS employees_type FROM employees WHERE "
"employees.type IN ([POSTCOMPILE_type_1])) AS "
- "anon_1",
+ "anon_1 WHERE anon_1.employees_type IN ([POSTCOMPILE_type_2])",
use_default_dialect=True,
)
sess = create_session()
col = func.count(literal_column("*"))
- self.assert_compile(
- sess.query(Engineer.employee_id).from_self(col),
- "SELECT count(*) AS count_1 "
- "FROM (SELECT employees.employee_id AS employees_employee_id "
- "FROM employees "
- "WHERE employees.type IN ([POSTCOMPILE_type_1])) AS anon_1",
- use_default_dialect=True,
- )
+ with testing.expect_deprecated(
+ r"The Query.from_self\(\) function/method"
+ ):
+ self.assert_compile(
+ sess.query(Engineer.employee_id).from_self(col),
+ "SELECT count(*) AS count_1 "
+ "FROM (SELECT employees.employee_id AS employees_employee_id "
+ "FROM employees "
+ "WHERE employees.type IN ([POSTCOMPILE_type_1])) AS anon_1",
+ use_default_dialect=True,
+ )
def test_select_from_count(self):
Manager, Engineer = (self.classes.Manager, self.classes.Engineer)
class DeclarativeBind(fixtures.TestBase):
def test_declarative_base(self):
with testing.expect_deprecated_20(
- 'The "bind" argument to declarative_base is'
+ "The ``bind`` argument to declarative_base is "
"deprecated and will be removed in SQLAlchemy 2.0.",
):
Base = declarative_base(bind=testing.db)
def test_as_declarative(self):
with testing.expect_deprecated_20(
- 'The "bind" argument to declarative_base is'
+ "The ``bind`` argument to as_declarative is "
"deprecated and will be removed in SQLAlchemy 2.0.",
):
@classmethod
def setup_class(cls):
- metadata = MetaData(testing.db)
+ metadata = MetaData()
global info_table
info_table = Table(
"infos",
Column("info", String(30)),
)
- info_table.create()
-
- info_table.insert().execute(
- {"pk": 1, "info": "pk_1_data"},
- {"pk": 2, "info": "pk_2_data"},
- {"pk": 3, "info": "pk_3_data"},
- {"pk": 4, "info": "pk_4_data"},
- {"pk": 5, "info": "pk_5_data"},
- {"pk": 6, "info": "pk_6_data"},
- )
+ with testing.db.begin() as conn:
+ info_table.create(conn)
+
+ conn.execute(
+ info_table.insert(),
+ [
+ {"pk": 1, "info": "pk_1_data"},
+ {"pk": 2, "info": "pk_2_data"},
+ {"pk": 3, "info": "pk_3_data"},
+ {"pk": 4, "info": "pk_4_data"},
+ {"pk": 5, "info": "pk_5_data"},
+ {"pk": 6, "info": "pk_6_data"},
+ ],
+ )
@classmethod
def teardown_class(cls):
- info_table.drop()
+ with testing.db.begin() as conn:
+ info_table.drop(conn)
@testing.fails_on("firebird", "FIXME: unknown")
@testing.requires.subqueries
- def test_case(self):
+ def test_case(self, connection):
inner = select(
case(
(info_table.c.pk < 3, "lessthan3"),
info_table.c.info,
).select_from(info_table)
- inner_result = inner.execute().fetchall()
+ inner_result = connection.execute(inner).all()
# Outputs:
# lessthan3 1 pk_1_data
outer = select(inner.alias("q_inner"))
- outer_result = outer.execute().fetchall()
+ outer_result = connection.execute(outer).all()
assert outer_result == [
("lessthan3", 1, "pk_1_data"),
info_table.c.info,
).select_from(info_table)
- else_result = w_else.execute().fetchall()
+ else_result = connection.execute(w_else).all()
eq_(
else_result,
"CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END",
)
- def test_text_doesnt_explode(self):
+ def test_text_doesnt_explode(self, connection):
for s in [
select(
).order_by(info_table.c.info),
]:
eq_(
- s.execute().fetchall(),
+ connection.execute(s).all(),
[("no",), ("no",), ("no",), ("yes",), ("no",), ("no",)],
)
assert_raises_message(
sa.exc.ArgumentError,
r"SQL expression for WHERE/HAVING role expected, "
- r"got \[(?:Sequence|ColumnDefault|DefaultClause)\('y'.*\)\]",
- t.select,
- [const],
+ r"got (?:Sequence|ColumnDefault|DefaultClause)\('y'.*\)",
+ t.select().where,
+ const,
)
assert_raises_message(
sa.exc.ArgumentError,
# TODO: add coverage for increment on a secondary column in a key
@testing.fails_on("firebird", "Data type unknown")
- def _test_autoincrement(self, bind):
+ def _test_autoincrement(self, connection):
aitable = self.tables.aitable
ids = set()
- rs = bind.execute(aitable.insert(), int1=1)
+ rs = connection.execute(aitable.insert(), int1=1)
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = bind.execute(aitable.insert(), str1="row 2")
+ rs = connection.execute(aitable.insert(), str1="row 2")
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = bind.execute(aitable.insert(), int1=3, str1="row 3")
+ rs = connection.execute(aitable.insert(), int1=3, str1="row 3")
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
ids.add(last)
- rs = bind.execute(aitable.insert(values={"int1": func.length("four")}))
+ rs = connection.execute(
+ aitable.insert().values({"int1": func.length("four")})
+ )
last = rs.inserted_primary_key[0]
self.assert_(last)
self.assert_(last not in ids)
)
eq_(
- list(bind.execute(aitable.select().order_by(aitable.c.id))),
+ list(connection.execute(aitable.select().order_by(aitable.c.id))),
[
(testing.db.dialect.default_sequence_base, 1, None),
(testing.db.dialect.default_sequence_base + 1, None, "row 2"),
)
def test_autoincrement_autocommit(self):
- self._test_autoincrement(testing.db)
+ with testing.db.connect() as conn:
+ self._test_autoincrement(conn)
def test_autoincrement_transaction(self):
with testing.db.begin() as conn:
table1 = self.tables.mytable
self.assert_compile(
- delete(table1, table1.c.myid == 7),
+ delete(table1).where(table1.c.myid == 7),
"DELETE FROM mytable WHERE mytable.myid = :myid_1",
)
# test a non-correlated WHERE clause
s = select(table2.c.othername).where(table2.c.otherid == 7)
self.assert_compile(
- delete(table1, table1.c.name == s.scalar_subquery()),
+ delete(table1).where(table1.c.name == s.scalar_subquery()),
"DELETE FROM mytable "
"WHERE mytable.name = ("
"SELECT myothertable.othername "
# test one that is actually correlated...
s = select(table2.c.othername).where(table2.c.otherid == table1.c.myid)
self.assert_compile(
- table1.delete(table1.c.name == s.scalar_subquery()),
+ table1.delete().where(table1.c.name == s.scalar_subquery()),
"DELETE FROM mytable "
"WHERE mytable.name = ("
"SELECT myothertable.othername "
#! coding: utf-8
+import itertools
+import random
+
from sqlalchemy import alias
from sqlalchemy import and_
from sqlalchemy import bindparam
from sqlalchemy import VARCHAR
from sqlalchemy.engine import default
from sqlalchemy.sql import coercions
+from sqlalchemy.sql import literal
from sqlalchemy.sql import operators
from sqlalchemy.sql import quoted_name
from sqlalchemy.sql import roles
+from sqlalchemy.sql import update
from sqlalchemy.sql import visitors
from sqlalchemy.sql.selectable import SelectStatementGrouping
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import not_in
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
+from .test_update import _UpdateFromTestBase
class ToMetaDataTest(fixtures.TestBase):
eq_(conn.execute(select(table)).first(), (5, 12))
-class DMLTest(fixtures.TestBase, AssertsCompiledSQL):
+class DMLTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
__dialect__ = "default"
def test_insert_inline_kw_defaults(self):
stmt, "UPDATE foo SET bar=%s LIMIT 10", dialect="mysql"
)
+ def test_update_whereclause(self):
+ table1 = table(
+ "mytable", Column("myid", Integer), Column("name", String(30)),
+ )
+
+ with testing.expect_deprecated_20(
+ "The update.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0"
+ ):
+ self.assert_compile(
+ table1.update(table1.c.myid == 7),
+ "UPDATE mytable SET myid=:myid, name=:name "
+ "WHERE mytable.myid = :myid_1",
+ )
+
+ def test_update_values(self):
+ table1 = table(
+ "mytable", Column("myid", Integer), Column("name", String(30)),
+ )
+
+ with testing.expect_deprecated_20(
+ "The update.values parameter will be removed in SQLAlchemy 2.0"
+ ):
+ self.assert_compile(
+ table1.update(values={table1.c.myid: 7}),
+ "UPDATE mytable SET myid=:myid",
+ )
+
+ def test_delete_whereclause(self):
+ table1 = table("mytable", Column("myid", Integer),)
+
+ with testing.expect_deprecated_20(
+ "The delete.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0"
+ ):
+ self.assert_compile(
+ table1.delete(table1.c.myid == 7),
+ "DELETE FROM mytable WHERE mytable.myid = :myid_1",
+ )
+
+ def test_update_ordered_parameters_fire_onupdate(self):
+ table = self.tables.update_w_default
+
+ values = [(table.c.y, table.c.x + 5), ("x", 10)]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ self.assert_compile(
+ table.update(preserve_parameter_order=True).values(values),
+ "UPDATE update_w_default "
+ "SET ycol=(update_w_default.x + :x_1), "
+ "x=:x, data=:data",
+ )
+
+ def test_update_ordered_parameters_override_onupdate(self):
+ table = self.tables.update_w_default
+
+ values = [
+ (table.c.y, table.c.x + 5),
+ (table.c.data, table.c.x + 10),
+ ("x", 10),
+ ]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ self.assert_compile(
+ table.update(preserve_parameter_order=True).values(values),
+ "UPDATE update_w_default "
+ "SET ycol=(update_w_default.x + :x_1), "
+ "data=(update_w_default.x + :x_2), x=:x",
+ )
+
+ def test_update_ordered_parameters_oldstyle_1(self):
+ table1 = self.tables.mytable
+
+ # Confirm that we can pass values as list value pairs
+ # note these are ordered *differently* from table.c
+ values = [
+ (table1.c.name, table1.c.name + "lala"),
+ (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
+ ]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0.",
+ "The update.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0",
+ "The update.values parameter will be removed in SQLAlchemy 2.0",
+ ):
+ self.assert_compile(
+ update(
+ table1,
+ (table1.c.myid == func.hoho(4))
+ & (
+ table1.c.name
+ == literal("foo") + table1.c.name + literal("lala")
+ ),
+ preserve_parameter_order=True,
+ values=values,
+ ),
+ "UPDATE mytable "
+ "SET "
+ "name=(mytable.name || :name_1), "
+ "myid=do_stuff(mytable.myid, :param_1) "
+ "WHERE "
+ "mytable.myid = hoho(:hoho_1) AND "
+ "mytable.name = :param_2 || mytable.name || :param_3",
+ )
+
+ def test_update_ordered_parameters_oldstyle_2(self):
+ table1 = self.tables.mytable
+
+ # Confirm that we can pass values as list value pairs
+ # note these are ordered *differently* from table.c
+ values = [
+ (table1.c.name, table1.c.name + "lala"),
+ ("description", "some desc"),
+ (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
+ ]
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0.",
+ "The update.whereclause parameter will be "
+ "removed in SQLAlchemy 2.0",
+ ):
+ self.assert_compile(
+ update(
+ table1,
+ (table1.c.myid == func.hoho(4))
+ & (
+ table1.c.name
+ == literal("foo") + table1.c.name + literal("lala")
+ ),
+ preserve_parameter_order=True,
+ ).values(values),
+ "UPDATE mytable "
+ "SET "
+ "name=(mytable.name || :name_1), "
+ "description=:description, "
+ "myid=do_stuff(mytable.myid, :param_1) "
+ "WHERE "
+ "mytable.myid = hoho(:hoho_1) AND "
+ "mytable.name = :param_2 || mytable.name || :param_3",
+ )
+
+ def test_update_preserve_order_reqs_listtups(self):
+ table1 = self.tables.mytable
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ testing.assert_raises_message(
+ ValueError,
+ r"When preserve_parameter_order is True, values\(\) "
+ r"only accepts a list of 2-tuples",
+ table1.update(preserve_parameter_order=True).values,
+ {"description": "foo", "name": "bar"},
+ )
+
+ @testing.fixture
+ def randomized_param_order_update(self):
+ from sqlalchemy.sql.dml import UpdateDMLState
+
+ super_process_ordered_values = UpdateDMLState._process_ordered_values
+
+ # this fixture is needed for Python 3.6 and above to work around
+ # dictionaries being insert-ordered. in python 2.7 the previous
+ # logic fails pretty easily without this fixture.
+ def _process_ordered_values(self, statement):
+ super_process_ordered_values(self, statement)
+
+ tuples = list(self._dict_parameters.items())
+ random.shuffle(tuples)
+ self._dict_parameters = dict(tuples)
+
+ dialect = default.StrCompileDialect()
+ dialect.paramstyle = "qmark"
+ dialect.positional = True
+
+ with mock.patch.object(
+ UpdateDMLState, "_process_ordered_values", _process_ordered_values
+ ):
+ yield
+
+ def random_update_order_parameters():
+ from sqlalchemy import ARRAY
+
+ t = table(
+ "foo",
+ column("data1", ARRAY(Integer)),
+ column("data2", ARRAY(Integer)),
+ column("data3", ARRAY(Integer)),
+ column("data4", ARRAY(Integer)),
+ )
+
+ idx_to_value = [
+ (t.c.data1, 5, 7),
+ (t.c.data2, 10, 18),
+ (t.c.data3, 8, 4),
+ (t.c.data4, 12, 14),
+ ]
+
+ def combinations():
+ while True:
+ random.shuffle(idx_to_value)
+ yield list(idx_to_value)
+
+ return testing.combinations(
+ *[
+ (t, combination)
+ for i, combination in zip(range(10), combinations())
+ ],
+ argnames="t, idx_to_value"
+ )
+
+ @random_update_order_parameters()
+ def test_update_to_expression_ppo(
+ self, randomized_param_order_update, t, idx_to_value
+ ):
+ dialect = default.StrCompileDialect()
+ dialect.paramstyle = "qmark"
+ dialect.positional = True
+
+ with testing.expect_deprecated_20(
+ "The update.preserve_parameter_order parameter will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ stmt = t.update(preserve_parameter_order=True).values(
+ [(col[idx], val) for col, idx, val in idx_to_value]
+ )
+
+ self.assert_compile(
+ stmt,
+ "UPDATE foo SET %s"
+ % (
+ ", ".join(
+ "%s[?]=?" % col.key for col, idx, val in idx_to_value
+ )
+ ),
+ dialect=dialect,
+ checkpositional=tuple(
+ itertools.chain.from_iterable(
+ (idx, val) for col, idx, val in idx_to_value
+ )
+ ),
+ )
+
class TableDeprecationTest(fixtures.TestBase):
def test_mustexists(self):
assert sql_util.ClauseAdapter(u).traverse(t1) is u
- def test_binds(self):
+ def test_bindparams(self):
"""test that unique bindparams change their name upon clone()
to prevent conflicts"""
t1alias = t1.alias("t1alias")
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
- vis.traverse(case([(t1.c.col1 == 5, t1.c.col2)], else_=t1.c.col1)),
+ vis.traverse(case((t1.c.col1 == 5, t1.c.col2), else_=t1.c.col1)),
"CASE WHEN (t1alias.col1 = :col1_1) THEN "
"t1alias.col2 ELSE t1alias.col1 END",
)
vis = sql_util.ClauseAdapter(t1alias)
self.assert_compile(
vis.traverse(
- case([(5, t1.c.col2)], value=t1.c.col1, else_=t1.c.col1)
+ case((5, t1.c.col2), value=t1.c.col1, else_=t1.c.col1)
),
"CASE t1alias.col1 WHEN :param_1 THEN "
"t1alias.col2 ELSE t1alias.col1 END",
"VALUES (:col1, :col2, :col3)",
)
- i2 = t1.insert(prefixes=["squiznart"])
+ i2 = t1.insert().prefix_with("squiznart")
self.assert_compile(
i2,
"INSERT squiznart INTO table1 (col1, col2, col3) "
)
def test_inline_values_single(self):
- i = t1.insert(values={"col1": 5})
+ i = t1.insert().values({"col1": 5})
compile_state = i._compile_state_factory(i, None)
is_(compile_state._has_multi_parameters, False)
def test_inline_values_multi(self):
- i = t1.insert(values=[{"col1": 5}, {"col1": 6}])
+ i = t1.insert().values([{"col1": 5}, {"col1": 6}])
compile_state = i._compile_state_factory(i, None)
)
def test_update_no_support_multi_constructor(self):
- stmt = t1.update(values=[{"col1": 5}, {"col1": 7}])
+ stmt = t1.update().values([{"col1": 5}, {"col1": 7}])
assert_raises_message(
exc.InvalidRequestError,
Column("stuff", String(20), onupdate="thisisstuff"),
)
meta.create_all(connection)
- connection.execute(t.insert(values=dict(value=func.length("one"))))
+ connection.execute(t.insert().values(value=func.length("one")))
eq_(connection.execute(t.select()).first().value, 3)
- connection.execute(t.update(values=dict(value=func.length("asfda"))))
+ connection.execute(t.update().values(value=func.length("asfda")))
eq_(connection.execute(t.select()).first().value, 5)
r = connection.execute(
- t.insert(values=dict(value=func.length("sfsaafsda")))
+ t.insert().values(value=func.length("sfsaafsda"))
)
id_ = r.inserted_primary_key[0]
- eq_(connection.execute(t.select(t.c.id == id_)).first().value, 9)
- connection.execute(t.update(values={t.c.value: func.length("asdf")}))
+ eq_(
+ connection.execute(t.select().where(t.c.id == id_)).first().value,
+ 9,
+ )
+ connection.execute(t.update().values({t.c.value: func.length("asdf")}))
eq_(connection.execute(t.select()).first().value, 4)
connection.execute(t2.insert())
- connection.execute(t2.insert(values=dict(value=func.length("one"))))
+ connection.execute(t2.insert().values(value=func.length("one")))
connection.execute(
- t2.insert(values=dict(value=func.length("asfda") + -19)),
- stuff="hi",
+ t2.insert().values(value=func.length("asfda") + -19), stuff="hi",
)
res = sorted(connection.execute(select(t2.c.value, t2.c.stuff)))
eq_(res, [(-14, "hi"), (3, None), (7, None)])
connection.execute(
- t2.update(values=dict(value=func.length("asdsafasd"))),
+ t2.update().values(value=func.length("asdsafasd")),
stuff="some stuff",
)
eq_(
connection.execute(t2.delete())
- connection.execute(
- t2.insert(values=dict(value=func.length("one") + 8))
- )
+ connection.execute(t2.insert().values(value=func.length("one") + 8))
eq_(connection.execute(t2.select()).first().value, 11)
- connection.execute(t2.update(values=dict(value=func.length("asfda"))))
+ connection.execute(t2.update().values(value=func.length("asfda")))
eq_(
connection.execute(select(t2.c.value, t2.c.stuff)).first(),
(5, "thisisstuff"),
)
connection.execute(
- t2.update(
- values={
- t2.c.value: func.length("asfdaasdf"),
- t2.c.stuff: "foo",
- }
+ t2.update().values(
+ {t2.c.value: func.length("asfdaasdf"), t2.c.stuff: "foo"}
)
)
checkparams = {"myid": 3, "name": "jack"}
self.assert_compile(
- insert(table1, dict(myid=3, name="jack")),
+ insert(table1).values(myid=3, name="jack"),
"INSERT INTO mytable (myid, name) VALUES (:myid, :name)",
checkparams=checkparams,
)
checkparams = {"myid": 3, "name": "jack", "unknowncol": "oops"}
- stmt = insert(table1, values=checkparams)
+ stmt = insert(table1).values(checkparams)
assert_raises_message(
exc.CompileError,
"Unconsumed column names: unknowncol",
{"myid": 4, "name": "someone", "unknowncol": "oops"},
]
- stmt = insert(table1, values=checkparams)
+ stmt = insert(table1).values(checkparams)
assert_raises_message(
exc.CompileError,
"Unconsumed column names: unknowncol",
}
self.assert_compile(
- insert(table1, (3, "jack", "mydescription")),
+ insert(table1).values([3, "jack", "mydescription"]),
"INSERT INTO mytable (myid, name, description) "
"VALUES (:myid, :name, :description)",
checkparams=checkparams,
table1 = self.tables.mytable
self.assert_compile(
- insert(table1, values=dict(myid=func.lala())),
+ insert(table1).values(myid=func.lala()),
"INSERT INTO mytable (myid) VALUES (lala())",
)
}
self.assert_compile(
- insert(table1, values),
+ insert(table1).values(values),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)",
)
values2 = {table1.c.name: bindparam("username")}
self.assert_compile(
- insert(table1, values=values1).values(values2),
+ insert(table1).values(values1).values(values2),
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)",
)
)
@testing.requires.multivalues_inserts
- def test_multivalues_insert(self):
+ def test_multivalues_insert(self, connection):
users = self.tables.users
- users.insert(
- values=[
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- ]
- ).execute()
- rows = users.select().order_by(users.c.user_id).execute().fetchall()
+ connection.execute(
+ users.insert().values(
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ ]
+ )
+ )
+ rows = connection.execute(
+ users.select().order_by(users.c.user_id)
+ ).all()
eq_(rows[0], (7, "jack"))
eq_(rows[1], (8, "ed"))
- users.insert(values=[(9, "jack"), (10, "ed")]).execute()
- rows = users.select().order_by(users.c.user_id).execute().fetchall()
+ connection.execute(users.insert().values([(9, "jack"), (10, "ed")]))
+ rows = connection.execute(
+ users.select().order_by(users.c.user_id)
+ ).all()
eq_(rows[2], (9, "jack"))
eq_(rows[3], (10, "ed"))
- def test_insert_heterogeneous_params(self):
+ def test_insert_heterogeneous_params(self, connection):
"""test that executemany parameters are asserted to match the
parameter set of the first."""
users = self.tables.users
"bind parameter 'user_name', in "
"parameter group 2\n"
r"\[SQL: u?INSERT INTO users",
- users.insert().execute,
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9},
+ connection.execute,
+ users.insert(),
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9},
+ ],
)
# this succeeds however. We aren't yet doing
# a length check on all subsequent parameters.
- users.insert().execute(
- {"user_id": 7}, {"user_id": 8, "user_name": "ed"}, {"user_id": 9}
+ connection.execute(
+ users.insert(),
+ {"user_id": 7},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9},
)
def _test_lastrow_accessor(self, table_, values, assertvalues):
):
is_(bool(comp.returning), True)
- result = engine.execute(table_.insert(), **values)
- ret = values.copy()
-
- for col, id_ in zip(
- table_.primary_key, result.inserted_primary_key
- ):
- ret[col.key] = id_
-
- if result.lastrow_has_defaults():
- criterion = and_(
- *[
- col == id_
- for col, id_ in zip(
- table_.primary_key, result.inserted_primary_key
- )
- ]
- )
- row = engine.execute(table_.select(criterion)).first()
- for c in table_.c:
- ret[c.key] = row._mapping[c]
+ with engine.begin() as connection:
+ result = connection.execute(table_.insert(), **values)
+ ret = values.copy()
+
+ for col, id_ in zip(
+ table_.primary_key, result.inserted_primary_key
+ ):
+ ret[col.key] = id_
+
+ if result.lastrow_has_defaults():
+ criterion = and_(
+ *[
+ col == id_
+ for col, id_ in zip(
+ table_.primary_key, result.inserted_primary_key
+ )
+ ]
+ )
+ row = connection.execute(
+ table_.select().where(criterion)
+ ).first()
+ for c in table_.c:
+ ret[c.key] = row._mapping[c]
return ret
if testing.against("firebird", "postgresql", "oracle", "mssql"):
Column("x", Integer, primary_key=True),
Column("y", Integer),
)
- t.create(eng)
- r = eng.execute(t.insert().values(y=5))
- eq_(r.inserted_primary_key, (0,))
+ with eng.begin() as conn:
+ t.create(conn)
+ r = conn.execute(t.insert().values(y=5))
+ eq_(r.inserted_primary_key, (0,))
@testing.fails_on(
"sqlite", "sqlite autoincrement doesn't work with composite pks"
)
@testing.provide_metadata
- def test_misordered_lastrow(self):
+ def test_misordered_lastrow(self, connection):
metadata = self.metadata
related = Table(
mariadb_engine="MyISAM",
)
- metadata.create_all()
- r = related.insert().values(id=12).execute()
+ metadata.create_all(connection)
+ r = connection.execute(related.insert().values(id=12))
id_ = r.inserted_primary_key[0]
eq_(id_, 12)
- r = t6.insert().values(manual_id=id_).execute()
+ r = connection.execute(t6.insert().values(manual_id=id_))
eq_(r.inserted_primary_key, (12, 1))
- def test_implicit_id_insert_select_columns(self):
+ def test_implicit_id_insert_select_columns(self, connection):
users = self.tables.users
stmt = users.insert().from_select(
(users.c.user_id, users.c.user_name),
users.select().where(users.c.user_id == 20),
)
- testing.db.execute(stmt)
+ r = connection.execute(stmt)
+ eq_(r.inserted_primary_key, (None,))
- def test_implicit_id_insert_select_keys(self):
+ def test_implicit_id_insert_select_keys(self, connection):
users = self.tables.users
stmt = users.insert().from_select(
["user_id", "user_name"],
users.select().where(users.c.user_id == 20),
)
- testing.db.execute(stmt)
+ r = connection.execute(stmt)
+ eq_(r.inserted_primary_key, (None,))
@testing.requires.empty_inserts
@testing.requires.returning
- def test_no_inserted_pk_on_returning(self):
+ def test_no_inserted_pk_on_returning(self, connection):
users = self.tables.users
- result = testing.db.execute(
+ result = connection.execute(
users.insert().returning(users.c.user_id, users.c.user_name)
)
assert_raises_message(
return t
def _test(
- self, stmt, row, returning=None, inserted_primary_key=False, table=None
+ self,
+ connection,
+ stmt,
+ row,
+ returning=None,
+ inserted_primary_key=False,
+ table=None,
):
- with testing.db.connect() as conn:
- r = conn.execute(stmt)
+ r = connection.execute(stmt)
- if returning:
- returned = r.first()
- eq_(returned, returning)
- elif inserted_primary_key is not False:
- eq_(r.inserted_primary_key, inserted_primary_key)
+ if returning:
+ returned = r.first()
+ eq_(returned, returning)
+ elif inserted_primary_key is not False:
+ eq_(r.inserted_primary_key, inserted_primary_key)
- if table is None:
- table = self.tables.foo
+ if table is None:
+ table = self.tables.foo
- eq_(conn.execute(table.select()).first(), row)
+ eq_(connection.execute(table.select()).first(), row)
- def _test_multi(self, stmt, rows, data):
- testing.db.execute(stmt, rows)
+ def _test_multi(self, connection, stmt, rows, data):
+ connection.execute(stmt, rows)
eq_(
- testing.db.execute(
+ connection.execute(
self.tables.foo.select().order_by(self.tables.foo.c.id)
- ).fetchall(),
+ ).all(),
data,
)
@testing.requires.sequences
- def test_explicit_sequence(self):
+ def test_explicit_sequence(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(
id=func.next_value(Sequence("t_id_seq")), data="data", x=5
),
(testing.db.dialect.default_sequence_base, "data", 5),
)
- def test_uppercase(self):
+ def test_uppercase(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(1,),
)
- def test_uppercase_inline(self):
+ def test_uppercase_inline(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().inline().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(1,),
"mssql+pyodbc",
"Pyodbc + SQL Server + Py3K, some decimal handling issue",
)
- def test_uppercase_inline_implicit(self):
+ def test_uppercase_inline_implicit(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().inline().values(data="data", x=5),
(1, "data", 5),
inserted_primary_key=(None,),
)
- def test_uppercase_implicit(self):
+ def test_uppercase_implicit(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=(testing.db.dialect.default_sequence_base,),
)
- def test_uppercase_direct_params(self):
+ def test_uppercase_direct_params(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(1,),
)
@testing.requires.returning
- def test_uppercase_direct_params_returning(self):
+ def test_uppercase_direct_params_returning(self, connection):
t = self.tables.foo
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5).returning(t.c.id, t.c.x),
(1, "data", 5),
returning=(1, 5),
)
@testing.requires.sql_expressions_inserted_as_primary_key
- def test_sql_expr_lastrowid(self):
+ def test_sql_expr_lastrowid(self, connection):
# see also test.orm.test_unitofwork.py
# ClauseAttributesTest.test_insert_pk_expression
t = self.tables.foo_no_seq
self._test(
+ connection,
t.insert().values(id=literal(5) + 10, data="data", x=5),
(15, "data", 5),
inserted_primary_key=(15,),
table=self.tables.foo_no_seq,
)
- def test_direct_params(self):
+ def test_direct_params(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
inserted_primary_key=(),
)
@testing.requires.returning
- def test_direct_params_returning(self):
+ def test_direct_params_returning(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(id=1, data="data", x=5).returning(t.c.id, t.c.x),
(testing.db.dialect.default_sequence_base, "data", 5),
returning=(testing.db.dialect.default_sequence_base, 5),
# does not indicate the Sequence
@testing.fails_if(testing.requires.sequences)
@testing.requires.emulated_lastrowid
- def test_implicit_pk(self):
+ def test_implicit_pk(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=(),
@testing.fails_if(testing.requires.sequences)
@testing.requires.emulated_lastrowid
- def test_implicit_pk_multi_rows(self):
+ def test_implicit_pk_multi_rows(self, connection):
t = self._fixture()
self._test_multi(
+ connection,
t.insert(),
[
{"data": "d1", "x": 5},
@testing.fails_if(testing.requires.sequences)
@testing.requires.emulated_lastrowid
- def test_implicit_pk_inline(self):
+ def test_implicit_pk_inline(self, connection):
t = self._fixture()
self._test(
+ connection,
t.insert().inline().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=(),
# version) generate a subquery for limits/offsets. ensure that the
# generated result map corresponds to the selected table, not the
# select query
- s = table1.select(
- use_labels=True, order_by=[table1.c.this_is_the_primarykey_column]
- ).limit(2)
+ s = (
+ table1.select()
+ .apply_labels()
+ .order_by(table1.c.this_is_the_primarykey_column)
+ .limit(2)
+ )
self._assert_labeled_table1_select(s)
def test_result_map_subquery(self):
table1 = self.table1
- s = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ s = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
s2 = select(s)
compiled = s2.compile(dialect=self._length_fixture())
table1 = self.table1
dialect = self._length_fixture()
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias()
+ )
s = select(q).apply_labels()
self.assert_compile(
def test_column_bind_labels_1(self):
table1 = self.table1
- s = table1.select(table1.c.this_is_the_primarykey_column == 4)
+ s = table1.select().where(table1.c.this_is_the_primarykey_column == 4)
self.assert_compile(
s,
"SELECT some_large_named_table.this_is_the_primarykey_column, "
def test_column_bind_labels_2(self):
table1 = self.table1
- s = table1.select(
+ s = table1.select().where(
or_(
table1.c.this_is_the_primarykey_column == 4,
table1.c.this_is_the_primarykey_column == 2,
def test_adjustable_1(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
compile_dialect = default.DefaultDialect(label_length=10)
def test_adjustable_2(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
table1 = self.table1
compile_dialect = default.DefaultDialect(label_length=4)
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
def test_adjustable_4(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias()
+ )
x = select(q).apply_labels()
compile_dialect = default.DefaultDialect(label_length=10)
def test_adjustable_5(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias()
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias()
+ )
x = select(q).apply_labels()
compile_dialect = default.DefaultDialect(label_length=4)
table1 = self.table1
q = (
- table1.select(table1.c.this_is_the_primarykey_column == 4)
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
.apply_labels()
.alias("foo")
)
def test_adjustable_result_schema_column_2(self):
table1 = self.table1
- q = table1.select(table1.c.this_is_the_primarykey_column == 4).alias(
- "foo"
+ q = (
+ table1.select()
+ .where(table1.c.this_is_the_primarykey_column == 4)
+ .alias("foo")
)
x = select(q)
def test_arg_insert_pk(self, connection):
t1 = self.tables.t1
result = connection.execute(
- t1.insert(return_defaults=[t1.c.insdef]).values(upddef=1)
+ t1.insert().return_defaults(t1.c.insdef).values(upddef=1)
)
eq_(
[
t1 = self.tables.t1
connection.execute(t1.insert().values(upddef=1))
result = connection.execute(
- t1.update(return_defaults=[t1.c.upddef]).values(data="d1")
+ t1.update().return_defaults(t1.c.upddef).values(data="d1")
)
eq_(
[result.returned_defaults._mapping[k] for k in (t1.c.upddef,)], [1]
table1 = self.tables.mytable
# test against a straight text subquery
- u = update(
- table1,
- values={
+ u = update(table1).values(
+ {
table1.c.name: text(
"(select name from mytable where id=mytable.id)"
)
- },
+ }
)
self.assert_compile(
u,
table1 = self.tables.mytable
mt = table1.alias()
- u = update(
- table1,
- values={
+ u = update(table1).values(
+ {
table1.c.name: select(mt.c.name)
.where(mt.c.myid == table1.c.myid)
.scalar_subquery()
- },
+ }
)
self.assert_compile(
u,
.where(table2.c.otherid == table1.c.myid)
.scalar_subquery()
)
- u = update(table1, table1.c.name == "jack", values={table1.c.name: s})
+ u = (
+ update(table1)
+ .where(table1.c.name == "jack")
+ .values({table1.c.name: s})
+ )
self.assert_compile(
u,
"UPDATE mytable SET name=(SELECT myothertable.otherid, "
# test a non-correlated WHERE clause
s = select(table2.c.othername).where(table2.c.otherid == 7)
- u = update(table1, table1.c.name == s.scalar_subquery())
+ u = update(table1).where(table1.c.name == s.scalar_subquery())
self.assert_compile(
u,
"UPDATE mytable SET myid=:myid, name=:name, "
# test one that is actually correlated...
s = select(table2.c.othername).where(table2.c.otherid == table1.c.myid)
- u = table1.update(table1.c.name == s.scalar_subquery())
+ u = table1.update().where(table1.c.name == s.scalar_subquery())
self.assert_compile(
u,
"UPDATE mytable SET myid=:myid, name=:name, "
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 7),
+ update(table1).where(table1.c.myid == 7),
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
params={table1.c.name: "fred"},
)
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 7),
+ update(table1).where(table1.c.myid == 7),
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
params={"name": "fred"},
)
table1 = self.tables.mytable
self.assert_compile(
- update(table1, values={table1.c.name: table1.c.myid}),
+ update(table1).values({table1.c.name: table1.c.myid}),
"UPDATE mytable SET name=mytable.myid",
)
table1 = self.tables.mytable
self.assert_compile(
- update(
- table1,
- whereclause=table1.c.name == bindparam("crit"),
- values={table1.c.name: "hi"},
- ),
+ update(table1)
+ .where(table1.c.name == bindparam("crit"))
+ .values({table1.c.name: "hi"},),
"UPDATE mytable SET name=:name WHERE mytable.name = :crit",
params={"crit": "notthere"},
checkparams={"crit": "notthere", "name": "hi"},
table1 = self.tables.mytable
self.assert_compile(
- update(
- table1,
- table1.c.myid == 12,
- values={table1.c.name: table1.c.myid},
- ),
+ update(table1)
+ .where(table1.c.myid == 12)
+ .values({table1.c.name: table1.c.myid},),
"UPDATE mytable "
"SET name=mytable.myid, description=:description "
"WHERE mytable.myid = :myid_1",
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 12, values={table1.c.myid: 9}),
+ update(table1)
+ .where(table1.c.myid == 12)
+ .values({table1.c.myid: 9}),
"UPDATE mytable "
"SET myid=:myid, description=:description "
"WHERE mytable.myid = :myid_1",
table1 = self.tables.mytable
self.assert_compile(
- update(table1, table1.c.myid == 12),
+ update(table1).where(table1.c.myid == 12),
"UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
params={"myid": 18},
checkparams={"myid": 18, "myid_1": 12},
def test_update_9(self):
table1 = self.tables.mytable
- s = table1.update(table1.c.myid == 12, values={table1.c.name: "lala"})
+ s = (
+ table1.update()
+ .where(table1.c.myid == 12)
+ .values({table1.c.name: "lala"})
+ )
c = s.compile(column_keys=["id", "name"])
eq_(str(s), str(c))
v1 = {table1.c.name: table1.c.myid}
v2 = {table1.c.name: table1.c.name + "foo"}
self.assert_compile(
- update(table1, table1.c.myid == 12, values=v1).values(v2),
+ update(table1).where(table1.c.myid == 12).values(v1).values(v2),
"UPDATE mytable "
"SET "
"name=(mytable.name || :name_1), "
}
self.assert_compile(
- update(
- table1,
+ update(table1)
+ .where(
(table1.c.myid == func.hoho(4))
& (
table1.c.name
== literal("foo") + table1.c.name + literal("lala")
- ),
- values=values,
- ),
+ )
+ )
+ .values(values),
"UPDATE mytable "
"SET "
"myid=do_stuff(mytable.myid, :param_1), "
column_keys=["j"],
)
- def test_update_ordered_parameters_oldstyle_1(self):
- table1 = self.tables.mytable
-
- # Confirm that we can pass values as list value pairs
- # note these are ordered *differently* from table.c
- values = [
- (table1.c.name, table1.c.name + "lala"),
- (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
- ]
- self.assert_compile(
- update(
- table1,
- (table1.c.myid == func.hoho(4))
- & (
- table1.c.name
- == literal("foo") + table1.c.name + literal("lala")
- ),
- preserve_parameter_order=True,
- values=values,
- ),
- "UPDATE mytable "
- "SET "
- "name=(mytable.name || :name_1), "
- "myid=do_stuff(mytable.myid, :param_1) "
- "WHERE "
- "mytable.myid = hoho(:hoho_1) AND "
- "mytable.name = :param_2 || mytable.name || :param_3",
- )
-
def test_update_ordered_parameters_newstyle_1(self):
table1 = self.tables.mytable
"mytable.name = :param_2 || mytable.name || :param_3",
)
- def test_update_ordered_parameters_oldstyle_2(self):
- table1 = self.tables.mytable
-
- # Confirm that we can pass values as list value pairs
- # note these are ordered *differently* from table.c
- values = [
- (table1.c.name, table1.c.name + "lala"),
- ("description", "some desc"),
- (table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
- ]
- self.assert_compile(
- update(
- table1,
- (table1.c.myid == func.hoho(4))
- & (
- table1.c.name
- == literal("foo") + table1.c.name + literal("lala")
- ),
- preserve_parameter_order=True,
- ).values(values),
- "UPDATE mytable "
- "SET "
- "name=(mytable.name || :name_1), "
- "description=:description, "
- "myid=do_stuff(mytable.myid, :param_1) "
- "WHERE "
- "mytable.myid = hoho(:hoho_1) AND "
- "mytable.name = :param_2 || mytable.name || :param_3",
- )
-
def test_update_ordered_parameters_newstyle_2(self):
table1 = self.tables.mytable
(table1.c.myid, func.do_stuff(table1.c.myid, literal("hoho"))),
]
self.assert_compile(
- update(
- table1,
+ update(table1)
+ .where(
(table1.c.myid == func.hoho(4))
& (
table1.c.name
== literal("foo") + table1.c.name + literal("lala")
),
- ).ordered_values(*values),
+ )
+ .ordered_values(*values),
"UPDATE mytable "
"SET "
"name=(mytable.name || :name_1), "
stmt.compile,
)
- def test_update_ordered_parameters_fire_onupdate(self):
- table = self.tables.update_w_default
-
- values = [(table.c.y, table.c.x + 5), ("x", 10)]
-
- self.assert_compile(
- table.update(preserve_parameter_order=True).values(values),
- "UPDATE update_w_default SET ycol=(update_w_default.x + :x_1), "
- "x=:x, data=:data",
- )
-
- def test_update_ordered_parameters_override_onupdate(self):
- table = self.tables.update_w_default
-
- values = [
- (table.c.y, table.c.x + 5),
- (table.c.data, table.c.x + 10),
- ("x", 10),
- ]
-
- self.assert_compile(
- table.update(preserve_parameter_order=True).values(values),
- "UPDATE update_w_default SET ycol=(update_w_default.x + :x_1), "
- "data=(update_w_default.x + :x_2), x=:x",
- )
-
- def test_update_preserve_order_reqs_listtups(self):
- table1 = self.tables.mytable
- testing.assert_raises_message(
- ValueError,
- r"When preserve_parameter_order is True, values\(\) "
- r"only accepts a list of 2-tuples",
- table1.update(preserve_parameter_order=True).values,
- {"description": "foo", "name": "bar"},
- )
-
def test_update_ordereddict(self):
table1 = self.tables.mytable
)
self.assert_compile(
- update(
- table1,
+ update(table1)
+ .where(
(table1.c.myid == func.hoho(4))
& (
table1.c.name
== literal("foo") + table1.c.name + literal("lala")
),
- values=values,
- ),
+ )
+ .values(values),
"UPDATE mytable "
"SET "
"myid=do_stuff(mytable.myid, :param_1), "
),
)
- @random_update_order_parameters()
- def test_update_to_expression_ppo(
- self, randomized_param_order_update, t, idx_to_value
- ):
- dialect = default.StrCompileDialect()
- dialect.paramstyle = "qmark"
- dialect.positional = True
-
- # deprecated pattern here
- stmt = t.update(preserve_parameter_order=True).values(
- [(col[idx], val) for col, idx, val in idx_to_value]
- )
-
- self.assert_compile(
- stmt,
- "UPDATE foo SET %s"
- % (
- ", ".join(
- "%s[?]=?" % col.key for col, idx, val in idx_to_value
- )
- ),
- dialect=dialect,
- checkpositional=tuple(
- itertools.chain.from_iterable(
- (idx, val) for col, idx, val in idx_to_value
- )
- ),
- )
-
def test_update_to_expression_three(self):
# this test is from test_defaults but exercises a particular
# parameter ordering issue
# against the alias, but we name the table-bound column
# in values. The behavior here isn't really defined
self.assert_compile(
- update(talias1, talias1.c.myid == 7).values(
- {table1.c.name: "fred"}
- ),
+ update(talias1)
+ .where(talias1.c.myid == 7)
+ .values({table1.c.name: "fred"}),
"UPDATE mytable AS t1 "
"SET name=:name "
"WHERE t1.myid = :myid_1",
# which is causing the "table1.c.name" param to be handled
# as an "extra table", hence we see the full table name rendered.
self.assert_compile(
- update(talias1, table1.c.myid == 7).values(
- {table1.c.name: "fred"}
- ),
+ update(talias1)
+ .where(table1.c.myid == 7)
+ .values({table1.c.name: "fred"}),
"UPDATE mytable AS t1 "
"SET name=:mytable_name "
"FROM mytable "
talias1 = table1.alias("t1")
self.assert_compile(
- update(talias1, table1.c.myid == 7).values(
- {table1.c.name: "fred"}
- ),
+ update(talias1)
+ .where(table1.c.myid == 7)
+ .values({table1.c.name: "fred"}),
"UPDATE mytable AS t1, mytable SET mytable.name=%s "
"WHERE mytable.myid = %s",
checkparams={"mytable_name": "fred", "myid_1": 7},
checkparams = {"email_address_1": "e1", "id_1": 7, "name": "newname"}
- cols = [addresses.c.id, addresses.c.user_id, addresses.c.email_address]
-
- subq = select(cols).where(addresses.c.id == 7).alias()
+ subq = (
+ select(
+ addresses.c.id, addresses.c.user_id, addresses.c.email_address
+ )
+ .where(addresses.c.id == 7)
+ .alias()
+ )
self.assert_compile(
users.update()
.values(name="newname")
PYTHONPATH=
PYTHONNOUSERSITE=1
MEMUSAGE=--nomemory
+ SQLALCHEMY_WARN_20=true
BASECOMMAND=python -m pytest --log-info=sqlalchemy.testing
WORKERS={env:TOX_WORKERS:-n4 --max-worker-restart=5}