when the "base" mapper was generated, thereby
causing failures later on. [ticket:2312]
+ - [bug] Fixed bug whereby column_property() created
+ against ORM-level column could be treated as
+ a distinct entity when producing certain
+ kinds of joined-inh joins. [ticket:2316]
+
- sql
+ - [bug] related to [ticket:2316], made some
+ adjustments to the change from [ticket:2261]
+ regarding the "from" list on a select(). The
+ _froms collection is no longer memoized, as this
+ simplifies various use cases and removes the
+ need for a "warning" if a column is attached
+ to a table after it was already used in an
+ expression - the select() construct will now
+ always produce the correct expression.
+ There's probably no real-world
+ performance hit here; select() objects are
+ almost always made ad-hoc, and systems that
+ wish to optimize the re-use of a select()
+ would be using the "compiled_cache" feature.
+ A hit which would occur when calling select.bind
+ has been reduced, but the vast majority
+ of users shouldn't be using "bound metadata"
+ anyway :).
+
- [feature] The update() construct can now accommodate
multiple tables in the WHERE clause, which will
render an "UPDATE..FROM" construct, recognized by
object_mapper, strategies, configure_mappers
from sqlalchemy.orm.util import CascadeOptions, _class_to_mapper, \
_orm_annotate, _orm_deannotate
+
from sqlalchemy.orm.interfaces import MANYTOMANY, MANYTOONE, \
MapperProperty, ONETOMANY, PropComparator, StrategizedProperty
mapperlib = util.importlater("sqlalchemy.orm", "mapperlib")
:param extension:
"""
- self.columns = [expression._labeled(c) for c in columns]
+ self.columns = [expression._labeled(_orm_deannotate(c))
+ for c in columns]
self.group = kwargs.pop('group', None)
self.deferred = kwargs.pop('deferred', False)
self.instrument = kwargs.pop('_instrument', True)
def _get_table(self):
return self.__dict__['table']
def _set_table(self, table):
- if '_from_objects' in self.__dict__:
- util.warn("%s being associated with %s object after "
- "the %s has already been used in a SQL "
- "generation; previously generated "
- "constructs may contain stale state." %
- (type(table), type(self), type(self)))
self._memoized_property.expire_instance(self)
self.__dict__['table'] = table
table = property(_get_table, _set_table)
_SelectBase.__init__(self, **kwargs)
- @_memoized_property
+ @property
def _froms(self):
+ # would love to cache this,
+ # but there's just enough edge cases, particularly now that
+ # declarative encourages construction of SQL expressions
+ # without tables present, to just regen this each time.
froms = []
seen = set()
translate = self._from_cloned
-
def add(items):
for item in items:
if translate and item in translate:
actually be rendered.
"""
- return self._froms + list(_from_objects(*self._froms))
+ froms = self._froms
+ return froms + list(_from_objects(*froms))
@property
def inner_columns(self):
def bind(self):
if self._bind:
return self._bind
- if not self._froms:
+ froms = self._froms
+ if not froms:
for c in self._raw_columns:
e = c.bind
if e:
self._bind = e
return e
else:
- e = list(self._froms)[0].bind
+ e = list(froms)[0].bind
if e:
self._bind = e
return e
# one more time, count the SQL
- sess2 = sessionmaker()()
+ sess2 = sessionmaker(testing.db)()
self.assert_sql_count(testing.db, go, 2)
class LoadManyToOneFromIdentityTest(fixtures.MappedTest):
def test_baseline_1a_populate(self):
Zoo = metadata.tables['Zoo']
Animal = metadata.tables['Animal']
- wap = Zoo.insert().execute(Name=u'Wild Animal Park',
+ engine = metadata.bind
+ wap = engine.execute(Zoo.insert(), Name=u'Wild Animal Park',
Founded=datetime.date(2000, 1, 1),
Opens=datetime.time(8, 15, 59),
LastEscape=
datetime.datetime(2004, 7, 29, 5, 6, 7),
Admission=4.95).inserted_primary_key[0]
- sdz = Zoo.insert().execute(Name=u'San Diego Zoo',
+ sdz = engine.execute(Zoo.insert(), Name=u'San Diego Zoo',
Founded=datetime.date(1935, 9, 13),
Opens=datetime.time(9, 0, 0),
Admission=0).inserted_primary_key[0]
- Zoo.insert(inline=True).execute(Name=u'Montr\xe9al Biod\xf4me',
+ engine.execute(Zoo.insert(inline=True), Name=u'Montr\xe9al Biod\xf4me',
Founded=datetime.date(1992, 6, 19),
Opens=datetime.time(9, 0, 0), Admission=11.75)
- seaworld = Zoo.insert().execute(Name=u'Sea_World',
+ seaworld = engine.execute(Zoo.insert(), Name=u'Sea_World',
Admission=60).inserted_primary_key[0]
# Let's add a crazy futuristic Zoo to test large date values.
- lp = Zoo.insert().execute(Name=u'Luna Park',
+ lp = engine.execute(Zoo.insert(), Name=u'Luna Park',
Founded=datetime.date(2072, 7, 17),
Opens=datetime.time(0, 0, 0),
Admission=134.95).inserted_primary_key[0]
# Animals
- leopardid = Animal.insert().execute(Species=u'Leopard',
+ leopardid = engine.execute(Animal.insert(), Species=u'Leopard',
Lifespan=73.5).inserted_primary_key[0]
- Animal.update(Animal.c.ID == leopardid).execute(ZooID=wap,
+ engine.execute(Animal.update(Animal.c.ID == leopardid), ZooID=wap,
LastEscape=datetime.datetime( 2004, 12, 21, 8, 15, 0, 999907,)
)
- lion = Animal.insert().execute(Species=u'Lion',
+ lion = engine.execute(Animal.insert(), Species=u'Lion',
ZooID=wap).inserted_primary_key[0]
- Animal.insert().execute(Species=u'Slug', Legs=1, Lifespan=.75)
- tiger = Animal.insert().execute(Species=u'Tiger',
+ engine.execute(Animal.insert(), Species=u'Slug', Legs=1, Lifespan=.75)
+ tiger = engine.execute(Animal.insert(), Species=u'Tiger',
ZooID=sdz).inserted_primary_key[0]
# Override Legs.default with itself just to make sure it works.
- Animal.insert(inline=True).execute(Species=u'Bear', Legs=4)
- Animal.insert(inline=True).execute(Species=u'Ostrich', Legs=2,
+ engine.execute(Animal.insert(inline=True), Species=u'Bear', Legs=4)
+ engine.execute(Animal.insert(inline=True), Species=u'Ostrich', Legs=2,
Lifespan=103.2)
- Animal.insert(inline=True).execute(Species=u'Centipede',
+ engine.execute(Animal.insert(inline=True), Species=u'Centipede',
Legs=100)
- emp = Animal.insert().execute(Species=u'Emperor Penguin',
+ emp = engine.execute(Animal.insert(), Species=u'Emperor Penguin',
Legs=2, ZooID=seaworld).inserted_primary_key[0]
- adelie = Animal.insert().execute(Species=u'Adelie Penguin',
+ adelie = engine.execute(Animal.insert(), Species=u'Adelie Penguin',
Legs=2, ZooID=seaworld).inserted_primary_key[0]
- Animal.insert(inline=True).execute(Species=u'Millipede',
+ engine.execute(Animal.insert(inline=True), Species=u'Millipede',
Legs=1000000, ZooID=sdz)
# Add a mother and child to test relationships
- bai_yun = Animal.insert().execute(Species=u'Ape',
+ bai_yun = engine.execute(Animal.insert(), Species=u'Ape',
Name=u'Bai Yun', Legs=2).inserted_primary_key[0]
- Animal.insert(inline=True).execute(Species=u'Ape',
+ engine.execute(Animal.insert(inline=True), Species=u'Ape',
Name=u'Hua Mei', Legs=2, MotherID=bai_yun)
def test_baseline_2_insert(self):
def test_baseline_3_properties(self):
Zoo = metadata.tables['Zoo']
Animal = metadata.tables['Animal']
+ engine = metadata.bind
def fullobject(select):
"""Iterate over the full result row."""
- return list(select.execute().first())
+ return list(engine.execute(select).first())
for x in xrange(ITERATIONS):
def test_baseline_4_expressions(self):
Zoo = metadata.tables['Zoo']
Animal = metadata.tables['Animal']
+ engine = metadata.bind
def fulltable(select):
"""Iterate over the full result table."""
- return [list(row) for row in select.execute().fetchall()]
+ return [list(row) for row in engine.execute(select).fetchall()]
for x in xrange(ITERATIONS):
assert len(fulltable(Zoo.select())) == 5
def test_baseline_5_aggregates(self):
Animal = metadata.tables['Animal']
Zoo = metadata.tables['Zoo']
+ engine = metadata.bind
+
for x in xrange(ITERATIONS):
# views
- view = select([Animal.c.Legs]).execute().fetchall()
+ view = engine.execute(select([Animal.c.Legs])).fetchall()
legs = [x[0] for x in view]
legs.sort()
expected = {
'Ape': None,
'Tick': None,
}
- for species, lifespan in select([Animal.c.Species,
- Animal.c.Lifespan]).execute().fetchall():
+ for species, lifespan in engine.execute(select([Animal.c.Species,
+ Animal.c.Lifespan])).fetchall():
assert lifespan == expected[species]
expected = [u'Montr\xe9al Biod\xf4me', 'Wild Animal Park']
e = select([Zoo.c.Name], and_(Zoo.c.Founded != None,
Zoo.c.Founded <= func.current_timestamp(),
Zoo.c.Founded >= datetime.date(1990, 1, 1)))
- values = [val[0] for val in e.execute().fetchall()]
+ values = [val[0] for val in engine.execute(e).fetchall()]
assert set(values) == set(expected)
# distinct
- legs = [x[0] for x in select([Animal.c.Legs],
- distinct=True).execute().fetchall()]
+ legs = [x[0] for x in engine.execute(select([Animal.c.Legs],
+ distinct=True)).fetchall()]
legs.sort()
def test_baseline_6_editing(self):
Zoo = metadata.tables['Zoo']
+ engine = metadata.bind
for x in xrange(ITERATIONS):
# Edit
- SDZ = Zoo.select(Zoo.c.Name == u'San Diego Zoo'
- ).execute().first()
- Zoo.update(Zoo.c.ID == SDZ['ID'
- ]).execute(Name=u'The San Diego Zoo',
+ SDZ = engine.execute(Zoo.select(Zoo.c.Name == u'San Diego Zoo'
+ )).first()
+ engine.execute(Zoo.update(Zoo.c.ID == SDZ['ID'
+ ]), Name=u'The San Diego Zoo',
Founded=datetime.date(1900, 1, 1),
Opens=datetime.time(7, 30, 0),
Admission='35.00')
# Test edits
- SDZ = Zoo.select(Zoo.c.Name == u'The San Diego Zoo'
- ).execute().first()
+ SDZ = engine.execute(Zoo.select(Zoo.c.Name == u'The San Diego Zoo'
+ )).first()
assert SDZ['Founded'] == datetime.date(1900, 1, 1), \
SDZ['Founded']
# Change it back
- Zoo.update(Zoo.c.ID == SDZ['ID'
- ]).execute(Name=u'San Diego Zoo',
+ engine.execute(Zoo.update(Zoo.c.ID == SDZ['ID'
+ ]), Name=u'San Diego Zoo',
Founded=datetime.date(1935, 9, 13),
Opens=datetime.time(9, 0, 0),
Admission='0')
# Test re-edits
- SDZ = Zoo.select(Zoo.c.Name == u'San Diego Zoo'
- ).execute().first()
+ SDZ = engine.execute(Zoo.select(Zoo.c.Name == u'San Diego Zoo'
+ )).first()
assert SDZ['Founded'] == datetime.date(1935, 9, 13)
def test_baseline_7_multiview(self):
Zoo = metadata.tables['Zoo']
Animal = metadata.tables['Animal']
+ engine = metadata.bind
def fulltable(select):
"""Iterate over the full result table."""
- return [list(row) for row in select.execute().fetchall()]
+ return [list(row) for row in engine.execute(select).fetchall()]
for x in xrange(ITERATIONS):
za = fulltable(select([Zoo.c.ID] + list(Animal.c),
recorder = lambda : dbapi_session.recorder(creator())
engine = engines.testing_engine(options={'creator': recorder, 'use_reaper':False})
metadata = MetaData(engine)
- session = sessionmaker()()
+ session = sessionmaker(engine)()
engine.connect()
def test_baseline_1_create_tables(self):
Zoo = metadata.tables['Zoo']
# TODO: convert to ORM
-
+ engine = metadata.bind
for x in xrange(ITERATIONS):
# views
- view = select([Animal.c.Legs]).execute().fetchall()
+ view = engine.execute(select([Animal.c.Legs])).fetchall()
legs = [x[0] for x in view]
legs.sort()
expected = {
'Ape': None,
'Tick': None,
}
- for species, lifespan in select([Animal.c.Species,
- Animal.c.Lifespan]).execute().fetchall():
+ for species, lifespan in engine.execute(select([Animal.c.Species,
+ Animal.c.Lifespan])).fetchall():
assert lifespan == expected[species]
expected = [u'Montr\xe9al Biod\xf4me', 'Wild Animal Park']
e = select([Zoo.c.Name], and_(Zoo.c.Founded != None,
Zoo.c.Founded <= func.current_timestamp(),
Zoo.c.Founded >= datetime.date(1990, 1, 1)))
- values = [val[0] for val in e.execute().fetchall()]
+ values = [val[0] for val in engine.execute(e).fetchall()]
assert set(values) == set(expected)
# distinct
- legs = [x[0] for x in select([Animal.c.Legs],
- distinct=True).execute().fetchall()]
+ legs = [x[0] for x in engine.execute(select([Animal.c.Legs],
+ distinct=True)).fetchall()]
legs.sort()
def test_baseline_6_editing(self):
player = lambda : dbapi_session.player()
engine = create_engine('postgresql:///', creator=player)
metadata = MetaData(engine)
- session = sessionmaker()()
+ session = sessionmaker(engine)()
engine.connect()
@profiling.function_call_count(5600)
id = Column('id', Integer, primary_key=True,
test_needs_autoincrement=True)
name = Column('name', String(50))
+
adr_count = \
- sa.orm.column_property(sa.select([sa.func.count(Address.id)],
- Address.user_id == id).as_scalar())
+ sa.orm.column_property(
+ sa.select([sa.func.count(Address.id)],
+ Address.user_id == id).as_scalar())
addresses = relationship(Address)
Base.metadata.create_all()
from test.lib.assertsql import CompiledSQL
import logging
-class MapperTest(_fixtures.FixtureTest):
+class MapperTest(_fixtures.FixtureTest, AssertsCompiledSQL):
+ __dialect__ = 'default'
def test_prop_shadow(self):
"""A backref name may not shadow an existing property name."""
u.name = 'jacko'
assert m._columntoproperty[users.c.name] is m.get_property('_name')
+ def test_add_column_prop_deannotate(self):
+ User, users = self.classes.User, self.tables.users
+ Address, addresses = self.classes.Address, self.tables.addresses
+ class SubUser(User):
+ pass
+ m = mapper(User, users)
+ m2 = mapper(SubUser, addresses, inherits=User)
+ m3 = mapper(Address, addresses, properties={
+ 'foo':relationship(m2)
+ })
+ # add property using annotated User.name,
+ # needs to be deannotated
+ m.add_property("x", column_property(User.name + "name"))
+ s = create_session()
+ q = s.query(m2).select_from(Address).join(Address.foo)
+ self.assert_compile(
+ q,
+ "SELECT "
+ "anon_1.addresses_id AS anon_1_addresses_id, "
+ "anon_1.users_id AS anon_1_users_id, "
+ "anon_1.users_name AS anon_1_users_name, "
+ "anon_1.addresses_user_id AS anon_1_addresses_user_id, "
+ "anon_1.addresses_email_address AS "
+ "anon_1_addresses_email_address, "
+ "anon_1.users_name || :name_1 AS anon_2 "
+ "FROM addresses JOIN (SELECT users.id AS users_id, "
+ "users.name AS users_name, addresses.id AS addresses_id, "
+ "addresses.user_id AS addresses_user_id, "
+ "addresses.email_address AS addresses_email_address "
+ "FROM users JOIN addresses ON users.id = "
+ "addresses.user_id) AS anon_1 ON "
+ "anon_1.users_id = addresses.user_id"
+ )
+
+ def test_column_prop_deannotate(self):
+ """test that column property deannotates,
+ bringing expressions down to the original mapped columns.
+ """
+ User, users = self.classes.User, self.tables.users
+ m = mapper(User, users)
+ assert User.id.property.columns[0] is users.c.id
+ assert User.name.property.columns[0] is users.c.name
+ expr = User.name + "name"
+ expr2 = sa.select([User.name, users.c.id])
+ m.add_property("x", column_property(expr))
+ m.add_property("y", column_property(expr2))
+
+ assert User.x.property.columns[0] is not expr
+ assert User.x.property.columns[0].element.left is users.c.name
+ assert User.x.property.columns[0].element.right is not expr.right
+
+ assert User.y.property.columns[0] is not expr2
+ assert User.y.property.columns[0].element.\
+ _raw_columns[0] is users.c.name
+ assert User.y.property.columns[0].element.\
+ _raw_columns[1] is users.c.id
+
def test_synonym_replaces_backref(self):
addresses, users, User = (self.tables.addresses,
self.tables.users,
s2 = select([c2])
s3 = sql_util.ClauseAdapter(s).traverse(s2)
- # the adaptation process needs the full
- # FROM list so can't avoid the warning on
- # this one
- assert_raises_message(
- exc.SAWarning,
- r"\<class 'sqlalchemy.schema.Table'\> being associated ",
- Table, 't', MetaData(), c1
+ Table('t', MetaData(), c1, c2)
+
+ self.assert_compile(
+ s3,
+ "SELECT t.c2 FROM t"
)
def test_from_list_with_columns(self):
c1 = Column('c1', Integer)
s = select([c1])
- # force a compile.
- eq_(str(s), "SELECT c1")
+ # force a compile.
+ self.assert_compile(
+ s,
+ "SELECT c1"
+ )
- # this will emit a warning
- assert_raises_message(
- exc.SAWarning,
- r"\<class 'sqlalchemy.schema.Table'\> being associated "
- r"with \<class 'sqlalchemy.schema.Column'\> object after "
- r"the \<class 'sqlalchemy.schema.Column'\> has already "
- r"been used in a SQL generation; previously "
- r"generated constructs may contain stale state.",
- Table, 't', MetaData(), c1
+ Table('t', MetaData(), c1)
+
+ self.assert_compile(
+ s,
+ "SELECT t.c1 FROM t"
)
def test_from_list_recovers_after_warning(self):
s = select([c1])
- # force a compile.
+ # force a compile.
eq_(str(s), "SELECT c1")
@testing.emits_warning()
# 's' has been baked. Can't afford
# not caching select._froms.
# hopefully the warning will clue the user
- self.assert_compile(s, "SELECT t.c1")
+ self.assert_compile(s, "SELECT t.c1 FROM t")
self.assert_compile(select([c1]), "SELECT t.c1 FROM t")
self.assert_compile(select([c2]), "SELECT t.c2 FROM t")