prop = self._props.get(key, None)
if isinstance(prop, properties.ColumnProperty):
- # TODO: the "property already exists" case is still not
- # well defined here. assuming single-column, etc.
-
- if prop.parent is not self:
- # existing properties.ColumnProperty from an inheriting mapper.
- # make a copy and append our column to it
- prop = prop.copy()
- else:
- util.warn(
+ if prop.parent is self:
+ raise sa_exc.InvalidRequestError(
"Implicitly combining column %s with column "
- "%s under attribute '%s'. This usage will be "
- "prohibited in 0.7. Please configure one "
+ "%s under attribute '%s'. Please configure one "
"or more attributes for these same-named columns "
"explicitly."
% (prop.columns[-1], column, key))
-
- # this hypothetically changes to
- # prop.columns.insert(0, column) when we do [ticket:1892]
- prop.columns.append(column)
- self._log("appending to existing properties.ColumnProperty %s" % (key))
+
+ # existing properties.ColumnProperty from an inheriting
+ # mapper. make a copy and append our column to it
+ prop = prop.copy()
+ prop.columns.insert(0, column)
+ self._log("inserting column to existing list "
+ "in properties.ColumnProperty %s" % (key))
elif prop is None or isinstance(prop, properties.ConcreteInheritedProperty):
mapped_column = []
eq_(u1.name, 'u1')
self.assert_sql_count(testing.db, go, 1)
-
+
+ def test_mapping_to_join(self):
+ users = Table('users', Base.metadata,
+ Column('id', Integer, primary_key=True)
+ )
+ addresses = Table('addresses', Base.metadata,
+ Column('id', Integer, primary_key=True),
+ Column('user_id', Integer, ForeignKey('users.id'))
+ )
+ usersaddresses = sa.join(users, addresses, users.c.id
+ == addresses.c.user_id)
+ class User(Base):
+ __table__ = usersaddresses
+ __table_args__ = {'primary_key':[users.c.id]}
+
+ # need to use column_property for now
+ user_id = column_property(users.c.id, addresses.c.user_id)
+ address_id = addresses.c.id
+
+ assert User.__mapper__.get_property('user_id').columns[0] \
+ is users.c.id
+ assert User.__mapper__.get_property('user_id').columns[1] \
+ is addresses.c.user_id
+
def test_synonym_inline(self):
class User(Base, ComparableEntity):
any(Engineer.primary_language
== 'cobol')).first(), c2)
- # ensure that the Manager mapper was compiled with the Person id
- # column as higher priority. this ensures that "id" will get
- # loaded from the Person row and not the possibly non-present
- # Manager row
+ # ensure that the Manager mapper was compiled with the Manager id
+ # column as higher priority. this ensures that "Manager.id"
+ # is appropriately treated as the "id" column in the "manager"
+ # table (reversed from 0.6's behavior.)
- assert Manager.id.property.columns == [Person.__table__.c.id,
- Manager.__table__.c.id]
+ assert Manager.id.property.columns == [Manager.__table__.c.id, Person.__table__.c.id]
# assert that the "id" column is available without a second
- # load. this would be the symptom of the previous step not being
- # correct.
+ # load. as of 0.7, the ColumnProperty tests all columns
+ # in it's list to see which is present in the row.
sess.expunge_all()
def go():
assert sess.query(Manager).filter(Manager.name == 'dogbert'
).one().id
-
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
"""Test that foreign keys that reference a literal 'id' subclass
'id' attribute behave intuitively.
- See ticket 1892.
+ See [ticket:1892].
"""
class Booking(Base):
sa.orm.compile_mappers() # no exceptions here
+ def test_foreign_keys_with_col(self):
+ """Test that foreign keys that reference a literal 'id' subclass
+ 'id' attribute behave intuitively.
+
+ See [ticket:1892].
+
+ """
+ class Booking(Base):
+ __tablename__ = 'booking'
+ id = Column(Integer, primary_key=True)
+
+ class PlanBooking(Booking):
+ __tablename__ = 'plan_booking'
+ id = Column(Integer, ForeignKey(Booking.id),
+ primary_key=True)
+
+ # referencing PlanBooking.id gives us the column
+ # on plan_booking, not booking
+ class FeatureBooking(Booking):
+ __tablename__ = 'feature_booking'
+ id = Column(Integer, ForeignKey(Booking.id),
+ primary_key=True)
+ plan_booking_id = Column(Integer,
+ ForeignKey(PlanBooking.id))
+
+ plan_booking = relationship(PlanBooking,
+ backref='feature_bookings')
+
+ assert FeatureBooking.__table__.c.plan_booking_id.\
+ references(PlanBooking.__table__.c.id)
+
+ assert FeatureBooking.__table__.c.id.\
+ references(Booking.__table__.c.id)
+
+
def test_single_colsonbase(self):
"""test single inheritance where all the columns are on the base
class."""
)
-
class FalseDiscriminatorTest(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
primary_key=[person_table.c.id, employee_table.c.id])
assert_raises_message(sa_exc.SAWarning,
r"On mapper Mapper\|Employee\|employees, "
- "primary key column 'employees.id' is being "
- "combined with distinct primary key column 'persons.id' "
+ "primary key column 'persons.id' is being "
+ "combined with distinct primary key column 'employees.id' "
"in attribute 'id'. Use explicit properties to give "
"each column its own mapped attribute name.",
self._do_test, True
# column of both tables.
eq_(
class_mapper(Sub).get_property('base_id').columns,
- [base.c.base_id, subtable.c.base_id]
+ [subtable.c.base_id, base.c.base_id]
)
def test_override_explicit(self):
# PK col
assert s2.id == s2.base_id != 15
- @testing.emits_warning(r'Implicit')
def test_override_implicit(self):
- # this is how the pattern looks intuitively when
- # using declarative.
- # fixed as part of [ticket:1111]
+ # this is originally [ticket:1111].
+ # the pattern here is now disallowed by [ticket:1892]
class Base(object):
pass
mapper(Base, base, properties={
'id':base.c.base_id
})
- mapper(Sub, subtable, inherits=Base, properties={
- 'id':subtable.c.base_id
- })
+ def go():
+ mapper(Sub, subtable, inherits=Base, properties={
+ 'id':subtable.c.base_id
+ })
# Sub mapper compilation needs to detect that "base.c.base_id"
# is renamed in the inherited mapper as "id", even though
- # it has its own "id" property. Sub's "id" property
- # gets joined normally with the extra column.
-
- eq_(
- set(class_mapper(Sub).get_property('id').columns),
- set([base.c.base_id, subtable.c.base_id])
- )
-
- s1 = Sub()
- s1.id = 10
- sess = create_session()
- sess.add(s1)
- sess.flush()
- assert sess.query(Sub).get(10) is s1
+ # it has its own "id" property. It then generates
+ # an exception in 0.7 due to the implicit conflict.
+ assert_raises(sa_exc.InvalidRequestError, go)
def test_plain_descriptor(self):
"""test that descriptors prevent inheritance from propigating properties to subclasses."""
sess.query(Company).filter(Company.employees.of_type(Engineer).any(and_(Engineer.primary_language=='cobol'))).one(),
c2
)
-
+
def test_join_from_columns_or_subclass(self):
sess = create_session()
[(u'dilbert',), (u'dilbert',), (u'dogbert',), (u'dogbert',), (u'pointy haired boss',), (u'vlad',), (u'wally',), (u'wally',)]
)
+ # Load Person.name, joining from Person -> paperwork, get all
+ # the people.
eq_(
- sess.query(Person.name).join((paperwork, Manager.person_id==paperwork.c.person_id)).order_by(Person.name).all(),
+ sess.query(Person.name).join((paperwork, Person.person_id==paperwork.c.person_id)).order_by(Person.name).all(),
[(u'dilbert',), (u'dilbert',), (u'dogbert',), (u'dogbert',), (u'pointy haired boss',), (u'vlad',), (u'wally',), (u'wally',)]
)
+ # same, on manager. get only managers.
+ eq_(
+ sess.query(Manager.name).join((paperwork, Manager.person_id==paperwork.c.person_id)).order_by(Person.name).all(),
+ [(u'dogbert',), (u'dogbert',), (u'pointy haired boss',)]
+ )
+
+ if select_type == '':
+ # this now raises, due to [ticket:1892]. Manager.person_id is now the "person_id" column on Manager.
+ # the SQL is incorrect.
+ assert_raises(
+ sa_exc.DBAPIError,
+ sess.query(Person.name).join((paperwork, Manager.person_id==paperwork.c.person_id)).order_by(Person.name).all,
+ )
+ elif select_type == 'Unions':
+ # with the union, not something anyone would really be using here, it joins to
+ # the full result set. This is 0.6's behavior and is more or less wrong.
+ eq_(
+ sess.query(Person.name).join((paperwork, Manager.person_id==paperwork.c.person_id)).order_by(Person.name).all(),
+ [(u'dilbert',), (u'dilbert',), (u'dogbert',), (u'dogbert',), (u'pointy haired boss',), (u'vlad',), (u'wally',), (u'wally',)]
+ )
+ else:
+ # when a join is present and managers.person_id is available, you get the managers.
+ eq_(
+ sess.query(Person.name).join((paperwork, Manager.person_id==paperwork.c.person_id)).order_by(Person.name).all(),
+ [(u'dogbert',), (u'dogbert',), (u'pointy haired boss',)]
+ )
+
+
eq_(
sess.query(Manager).join((Paperwork, Manager.paperwork)).order_by(Manager.name).all(),
[m1, b1]
# test the same again
self.assert_compile(
session.query(Child2).join(Child2.right_children).filter(Child1.left_child2==c22).with_labels().statement,
- "SELECT parent.id AS parent_id, child2.id AS child2_id, parent.cls AS parent_cls FROM "
+ "SELECT child2.id AS child2_id, parent.id AS parent_id, parent.cls AS parent_cls FROM "
"secondary AS secondary_1, parent JOIN child2 ON parent.id = child2.id JOIN secondary AS secondary_2 "
"ON parent.id = secondary_2.left_id JOIN (SELECT parent.id AS parent_id, parent.cls AS parent_cls, "
"child1.id AS child1_id FROM parent JOIN child1 ON parent.id = child1.id) AS anon_1 ON "
# test that the splicing of the join works here, doesnt break in the middle of "parent join child1"
self.assert_compile(q.limit(1).with_labels().statement,
- "SELECT anon_1.parent_id AS anon_1_parent_id, anon_1.child1_id AS anon_1_child1_id, "\
- "anon_1.parent_cls AS anon_1_parent_cls, anon_2.parent_id AS anon_2_parent_id, "\
- "anon_2.child2_id AS anon_2_child2_id, anon_2.parent_cls AS anon_2_parent_cls FROM "\
- "(SELECT parent.id AS parent_id, child1.id AS child1_id, parent.cls AS parent_cls FROM parent "\
+ "SELECT anon_1.child1_id AS anon_1_child1_id, anon_1.parent_id AS anon_1_parent_id, "\
+ "anon_1.parent_cls AS anon_1_parent_cls, anon_2.child2_id AS anon_2_child2_id, "\
+ "anon_2.parent_id AS anon_2_parent_id, anon_2.parent_cls AS anon_2_parent_cls FROM "\
+ "(SELECT child1.id AS child1_id, parent.id AS parent_id, parent.cls AS parent_cls FROM parent "\
"JOIN child1 ON parent.id = child1.id LIMIT 1) AS anon_1 LEFT OUTER JOIN secondary AS secondary_1 "\
"ON anon_1.parent_id = secondary_1.right_id LEFT OUTER JOIN (SELECT parent.id AS parent_id, "\
"parent.cls AS parent_cls, child2.id AS child2_id FROM parent JOIN child2 ON parent.id = child2.id) "\
@testing.resolve_artifact_names
def test_mapping_to_join_raises(self):
- """Test implicit merging of two cols warns."""
+ """Test implicit merging of two cols raises."""
usersaddresses = sa.join(users, addresses,
users.c.id == addresses.c.user_id)
assert_raises_message(
- sa.exc.SAWarning,
+ sa.exc.InvalidRequestError,
"Implicitly",
mapper, User, usersaddresses, primary_key=[users.c.id]
)
- sa.orm.clear_mappers()
-
- @testing.emits_warning(r'Implicitly')
- def go():
- # but it works despite the warning
- mapper(User, usersaddresses, primary_key=[users.c.id])
- l = create_session().query(User).order_by(users.c.id).all()
- eq_(l, self.static.user_result[:3])
- go()
@testing.resolve_artifact_names
def test_mapping_to_join(self):
usersaddresses = sa.join(users, addresses, users.c.id
== addresses.c.user_id)
mapper(User, usersaddresses, primary_key=[users.c.id],
- exclude_properties=[addresses.c.id])
+ #exclude_properties=[addresses.c.id]
+ properties={'add_id':addresses.c.id}
+ )
l = create_session().query(User).order_by(users.c.id).all()
eq_(l, self.static.user_result[:3])
testing.db,
lambda: c1.child2,
CompiledSQL(
- "SELECT base.id AS base_id, child2.id AS child2_id, base.type AS base_type "
+ "SELECT child2.id AS child2_id, base.id AS base_id, base.type AS base_type "
"FROM base JOIN child2 ON base.id = child2.id "
"WHERE base.id = :param_1",
{'param_1':4}
testing.db,
lambda: c1.child2,
CompiledSQL(
- "SELECT base.id AS base_id, child2.id AS child2_id, base.type AS base_type "
+ "SELECT child2.id AS child2_id, base.id AS base_id, base.type AS base_type "
"FROM base JOIN child2 ON base.id = child2.id WHERE base.id = :param_1",
# joinedload- this shouldn't happen
testing.db,
lambda: c1.child2,
CompiledSQL(
- "SELECT base.id AS base_id, child2.id AS child2_id, base.type AS base_type, "
+ "SELECT child2.id AS child2_id, base.id AS base_id, base.type AS base_type, "
"related_1.id AS related_1_id FROM base JOIN child2 ON base.id = child2.id "
"LEFT OUTER JOIN related AS related_1 ON base.id = related_1.id WHERE base.id = :param_1",
{'param_1':4}