From: Mike Bayer Date: Thu, 17 May 2012 14:57:31 +0000 (-0400) Subject: - add DeferredReflection to declarative itself X-Git-Tag: rel_0_8_0b1~424 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=da8032dc45ad8243323e9359f9b31efe1b7cfe5b;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - add DeferredReflection to declarative itself - split out test_declarative into four separate modules --- diff --git a/lib/sqlalchemy/ext/declarative.py b/lib/sqlalchemy/ext/declarative.py index 58e8e6de68..93df1c5935 100755 --- a/lib/sqlalchemy/ext/declarative.py +++ b/lib/sqlalchemy/ext/declarative.py @@ -1025,10 +1025,10 @@ from sqlalchemy.orm.util import polymorphic_union, _mapper_or_none __all__ = 'declarative_base', 'synonym_for', \ 'comparable_using', 'instrument_declarative' -def declared_mapping_info(cls): +def _declared_mapping_info(cls): # deferred mapping - if cls in _MapperThingy.thingies: - return _MapperThingy.thingies[cls] + if cls in _MapperConfig.configs: + return _MapperConfig.configs[cls] # regular mapping elif _is_mapped_class(cls): return class_mapper(cls, compile=False) @@ -1086,7 +1086,7 @@ def _as_declarative(cls, classname, dict_): ): return - class_mapped = declared_mapping_info(base) is not None + class_mapped = _declared_mapping_info(base) is not None if class_mapped: parent_columns = base.__table__.c.keys() @@ -1261,7 +1261,7 @@ def _as_declarative(cls, classname, dict_): if 'inherits' not in mapper_args: for c in cls.__bases__: - if declared_mapping_info(c) is not None: + if _declared_mapping_info(c) is not None: mapper_args['inherits'] = c break @@ -1278,7 +1278,7 @@ def _as_declarative(cls, classname, dict_): ) elif 'inherits' in mapper_args and not mapper_args.get('concrete', False): - inherited_mapper = declared_mapping_info(mapper_args['inherits']) + inherited_mapper = _declared_mapping_info(mapper_args['inherits']) inherited_table = inherited_mapper.local_table if table is None: @@ -1309,7 +1309,7 @@ def _as_declarative(cls, classname, dict_): # exclude any cols on the inherited table which are not mapped on the # parent class, to avoid # mapping columns specific to sibling/nephew classes - inherited_mapper = declared_mapping_info(mapper_args['inherits']) + inherited_mapper = _declared_mapping_info(mapper_args['inherits']) inherited_table = inherited_mapper.local_table if 'exclude_properties' not in mapper_args: @@ -1336,42 +1336,38 @@ def _as_declarative(cls, classname, dict_): # change this ordering when we do [ticket:1892] our_stuff[k] = p.columns + [col] - - mt = _MapperThingy(mapper_cls, - cls, table, our_stuff, mapper_args) + mt = _MapperConfig(mapper_cls, + cls, table, + properties=our_stuff, + **mapper_args) if not hasattr(cls, '__prepare__'): mt.map() -class _MapperThingy(object): - thingies = util.OrderedDict() +class _MapperConfig(object): + configs = util.OrderedDict() - def __init__(self, mapper_cls, cls, table, properties, mapper_args): + def __init__(self, mapper_cls, cls, table, **mapper_args): self.mapper_cls = mapper_cls self.cls = cls self.local_table = table - self.properties = properties self.mapper_args = mapper_args self._columntoproperty = set() if table is not None: self._columntoproperty.update(table.c) - self.thingies[cls] = self + self.configs[cls] = self + + @property + def args(self): + return self.cls, self.local_table, self.mapper_args def map(self): - self.thingies.pop(self.cls, None) + self.configs.pop(self.cls, None) self.cls.__mapper__ = self.mapper_cls( self.cls, self.local_table, - properties=self.properties, **self.mapper_args ) -def prepare_deferred_mapping(base, *arg, **kw): - to_map = set([m for m in _MapperThingy.thingies.values() - if issubclass(m.cls, base)]) - for thingy in to_map: - base.__prepare__(thingy, *arg, **kw) - thingy.map() - class DeclarativeMeta(type): def __init__(cls, classname, bases, dict_): if '_decl_class_registry' in cls.__dict__: @@ -1803,3 +1799,96 @@ class AbstractConcreteBase(ConcreteBase): sm = _mapper_or_none(scls) if sm.concrete and cls in scls.__bases__: sm._set_concrete_base(m) + + +class DeferredReflection(object): + """A helper class for construction of mappings based on + a deferred reflection step. + + Normally, declarative can be used with reflection by + setting a :class:`.Table` object using autoload=True + as the ``__table__`` attribute on a declarative class. + The caveat is that the :class:`.Table` must be fully + reflected, or at the very least have a primary key column, + at the point at which a normal declarative mapping is + constructed, meaning the :class:`.Engine` must be available + at class declaration time. + + The :class:`.DeferredReflection` mixin moves the construction + of mappers to be at a later point, after a specific + method is called which first reflects all :class:`.Table` + objects created so far. Classes can define it as such:: + + from sqlalchemy.ext.declarative import declarative_base, DeferredReflection + Base = declarative_base() + + class MyClass(DeferredReflection, Base): + __tablename__ = 'mytable' + + Above, ``MyClass`` is not yet mapped. After a series of + classes have been defined in the above fashion, all tables + can be reflected and mappings created using :meth:`.DeferredReflection.prepare`:: + + engine = create_engine("someengine://...") + DeferredReflection.prepare(engine) + + The :class:`.DeferredReflection` mixin can be applied to individual + classes, used as the base for the declarative base itself, + or used in a custom abstract class. Using an abstract base + allows that only a subset of classes to be prepared for a + particular prepare step, which is necessary for applications + that use more than one engine. For example, if an application + has two engines, you might use two bases, and prepare each + separately, e.g.:: + + class ReflectedOne(DeferredReflection, Base): + __abstract__ = True + + class ReflectedTwo(DeferredReflection, Base): + __abstract__ = True + + class MyClass(ReflectedOne): + __tablename__ = 'mytable' + + class MyOtherClass(ReflectedOne): + __tablename__ = 'myothertable' + + class YetAnotherClass(ReflectedTwo): + __tablename__ = 'yetanothertable' + + # ... etc. + + Above, the class hierarchies for ``ReflectedOne`` and + ``ReflectedTwo`` can be configured separately:: + + ReflectedOne.prepare(engine_one) + ReflectedTwo.prepare(engine_two) + + .. versionadded:: 0.8 + + """ + @classmethod + def prepare(cls, engine): + """Reflect all :class:`.Table` objects for all current + :class:`.DeferredReflection` subclasses""" + to_map = set([m for m in _MapperConfig.configs.values() + if issubclass(m.cls, cls)]) + for thingy in to_map: + cls.__prepare__(thingy.args, engine) + thingy.map() + + @classmethod + def __prepare__(cls, mapper_args, engine): + cls, local_table, args = mapper_args + # autoload Table, which is already + # present in the metadata. This + # will fill in db-loaded columns + # into the existing Table object. + if local_table is not None: + Table(local_table.name, + local_table.metadata, + extend_existing=True, + autoload_replace=False, + autoload=True, + autoload_with=engine, + schema=local_table.schema) diff --git a/test/ext/test_declarative.py b/test/ext/test_declarative.py index 5e185f6645..1d9f7f39bd 100644 --- a/test/ext/test_declarative.py +++ b/test/ext/test_declarative.py @@ -1329,2247 +1329,87 @@ class DeclarativeTest(DeclarativeTestBase): __tablename__ = 'b' id = Column(Integer, primary_key=True) -class DeclarativeInheritanceTest(DeclarativeTestBase): - def test_we_must_copy_mapper_args(self): - class Person(Base): - - __tablename__ = 'people' - id = Column(Integer, primary_key=True) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator, - 'polymorphic_identity': 'person'} - - class Engineer(Person): - - primary_language = Column(String(50)) - - assert 'inherits' not in Person.__mapper_args__ - assert class_mapper(Engineer).polymorphic_identity is None - assert class_mapper(Engineer).polymorphic_on is Person.__table__.c.type - - def test_we_must_only_copy_column_mapper_args(self): - - class Person(Base): - - __tablename__ = 'people' - id = Column(Integer, primary_key=True) - a=Column(Integer) - b=Column(Integer) - c=Column(Integer) - d=Column(Integer) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator, - 'polymorphic_identity': 'person', - 'version_id_col': 'a', - 'column_prefix': 'bar', - 'include_properties': ['id', 'a', 'b'], - } - assert class_mapper(Person).version_id_col == 'a' - assert class_mapper(Person).include_properties == set(['id', 'a', 'b']) - - - def test_custom_join_condition(self): - - class Foo(Base): - - __tablename__ = 'foo' - id = Column('id', Integer, primary_key=True) - - class Bar(Foo): - - __tablename__ = 'bar' - id = Column('id', Integer, primary_key=True) - foo_id = Column('foo_id', Integer) - __mapper_args__ = {'inherit_condition': foo_id == Foo.id} - - # compile succeeds because inherit_condition is honored - - configure_mappers() - - def test_joined(self): - - class Company(Base, fixtures.ComparableEntity): - - __tablename__ = 'companies' - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column('name', String(50)) - employees = relationship('Person') - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - company_id = Column('company_id', Integer, - ForeignKey('companies.id')) - name = Column('name', String(50)) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class Engineer(Person): - - __tablename__ = 'engineers' - __mapper_args__ = {'polymorphic_identity': 'engineer'} - id = Column('id', Integer, ForeignKey('people.id'), - primary_key=True) - primary_language = Column('primary_language', String(50)) - - class Manager(Person): - - __tablename__ = 'managers' - __mapper_args__ = {'polymorphic_identity': 'manager'} - id = Column('id', Integer, ForeignKey('people.id'), - primary_key=True) - golf_swing = Column('golf_swing', String(50)) - - Base.metadata.create_all() - sess = create_session() - c1 = Company(name='MegaCorp, Inc.', - employees=[Engineer(name='dilbert', - primary_language='java'), Engineer(name='wally', - primary_language='c++'), Manager(name='dogbert', - golf_swing='fore!')]) - c2 = Company(name='Elbonia, Inc.', - employees=[Engineer(name='vlad', - primary_language='cobol')]) - sess.add(c1) - sess.add(c2) - sess.flush() - sess.expunge_all() - eq_(sess.query(Company).filter(Company.employees.of_type(Engineer). - any(Engineer.primary_language - == 'cobol')).first(), c2) - - # ensure that the Manager mapper was compiled with the Manager id - # column as higher priority. this ensures that "Manager.id" - # is appropriately treated as the "id" column in the "manager" - # table (reversed from 0.6's behavior.) - - assert Manager.id.property.columns == [Manager.__table__.c.id, Person.__table__.c.id] - - # assert that the "id" column is available without a second - # load. as of 0.7, the ColumnProperty tests all columns - # in it's list to see which is present in the row. - - sess.expunge_all() - - def go(): - assert sess.query(Manager).filter(Manager.name == 'dogbert' - ).one().id - self.assert_sql_count(testing.db, go, 1) - sess.expunge_all() - - def go(): - assert sess.query(Person).filter(Manager.name == 'dogbert' - ).one().id - - self.assert_sql_count(testing.db, go, 1) - - def test_add_subcol_after_the_fact(self): - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column('name', String(50)) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class Engineer(Person): - - __tablename__ = 'engineers' - __mapper_args__ = {'polymorphic_identity': 'engineer'} - id = Column('id', Integer, ForeignKey('people.id'), - primary_key=True) - - Engineer.primary_language = Column('primary_language', - String(50)) - Base.metadata.create_all() - sess = create_session() - e1 = Engineer(primary_language='java', name='dilbert') - sess.add(e1) - sess.flush() - sess.expunge_all() - eq_(sess.query(Person).first(), Engineer(primary_language='java' - , name='dilbert')) - - def test_add_parentcol_after_the_fact(self): - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class Engineer(Person): - - __tablename__ = 'engineers' - __mapper_args__ = {'polymorphic_identity': 'engineer'} - primary_language = Column(String(50)) - id = Column('id', Integer, ForeignKey('people.id'), - primary_key=True) - - Person.name = Column('name', String(50)) - Base.metadata.create_all() - sess = create_session() - e1 = Engineer(primary_language='java', name='dilbert') - sess.add(e1) - sess.flush() - sess.expunge_all() - eq_(sess.query(Person).first(), - Engineer(primary_language='java', name='dilbert')) - - def test_add_sub_parentcol_after_the_fact(self): - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class Engineer(Person): - - __tablename__ = 'engineers' - __mapper_args__ = {'polymorphic_identity': 'engineer'} - primary_language = Column(String(50)) - id = Column('id', Integer, ForeignKey('people.id'), - primary_key=True) - - class Admin(Engineer): - - __tablename__ = 'admins' - __mapper_args__ = {'polymorphic_identity': 'admin'} - workstation = Column(String(50)) - id = Column('id', Integer, ForeignKey('engineers.id'), - primary_key=True) - - Person.name = Column('name', String(50)) - Base.metadata.create_all() - sess = create_session() - e1 = Admin(primary_language='java', name='dilbert', - workstation='foo') - sess.add(e1) - sess.flush() - sess.expunge_all() - eq_(sess.query(Person).first(), Admin(primary_language='java', - name='dilbert', workstation='foo')) - - def test_subclass_mixin(self): - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column('id', Integer, primary_key=True) - name = Column('name', String(50)) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class MyMixin(object): - - pass - - class Engineer(MyMixin, Person): - - __tablename__ = 'engineers' - __mapper_args__ = {'polymorphic_identity': 'engineer'} - id = Column('id', Integer, ForeignKey('people.id'), - primary_key=True) - primary_language = Column('primary_language', String(50)) - - assert class_mapper(Engineer).inherits is class_mapper(Person) - - @testing.fails_if(lambda: True, "Not implemented until 0.7") - def test_foreign_keys_with_col(self): - """Test that foreign keys that reference a literal 'id' subclass - 'id' attribute behave intuitively. - - See [ticket:1892]. - - """ - - class Booking(Base): - __tablename__ = 'booking' - id = Column(Integer, primary_key=True) - - class PlanBooking(Booking): - __tablename__ = 'plan_booking' - id = Column(Integer, ForeignKey(Booking.id), - primary_key=True) - - # referencing PlanBooking.id gives us the column - # on plan_booking, not booking - class FeatureBooking(Booking): - __tablename__ = 'feature_booking' - id = Column(Integer, ForeignKey(Booking.id), - primary_key=True) - plan_booking_id = Column(Integer, - ForeignKey(PlanBooking.id)) - - plan_booking = relationship(PlanBooking, - backref='feature_bookings') - - assert FeatureBooking.__table__.c.plan_booking_id.\ - references(PlanBooking.__table__.c.id) - - assert FeatureBooking.__table__.c.id.\ - references(Booking.__table__.c.id) - - def test_with_undefined_foreignkey(self): - - class Parent(Base): - - __tablename__ = 'parent' - id = Column('id', Integer, primary_key=True) - tp = Column('type', String(50)) - __mapper_args__ = dict(polymorphic_on=tp) - - class Child1(Parent): - - __tablename__ = 'child1' - id = Column('id', Integer, ForeignKey('parent.id'), - primary_key=True) - related_child2 = Column('c2', Integer, - ForeignKey('child2.id')) - __mapper_args__ = dict(polymorphic_identity='child1') - - # no exception is raised by the ForeignKey to "child2" even - # though child2 doesn't exist yet - - class Child2(Parent): - - __tablename__ = 'child2' - id = Column('id', Integer, ForeignKey('parent.id'), - primary_key=True) - related_child1 = Column('c1', Integer) - __mapper_args__ = dict(polymorphic_identity='child2') - - sa.orm.configure_mappers() # no exceptions here - - def test_foreign_keys_with_col(self): - """Test that foreign keys that reference a literal 'id' subclass - 'id' attribute behave intuitively. - - See [ticket:1892]. - - """ - - class Booking(Base): - __tablename__ = 'booking' - id = Column(Integer, primary_key=True) - - class PlanBooking(Booking): - __tablename__ = 'plan_booking' - id = Column(Integer, ForeignKey(Booking.id), - primary_key=True) - - # referencing PlanBooking.id gives us the column - # on plan_booking, not booking - class FeatureBooking(Booking): - __tablename__ = 'feature_booking' - id = Column(Integer, ForeignKey(Booking.id), - primary_key=True) - plan_booking_id = Column(Integer, - ForeignKey(PlanBooking.id)) - - plan_booking = relationship(PlanBooking, - backref='feature_bookings') - - assert FeatureBooking.__table__.c.plan_booking_id.\ - references(PlanBooking.__table__.c.id) - - assert FeatureBooking.__table__.c.id.\ - references(Booking.__table__.c.id) - - - def test_single_colsonbase(self): - """test single inheritance where all the columns are on the base - class.""" - - class Company(Base, fixtures.ComparableEntity): - - __tablename__ = 'companies' - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column('name', String(50)) - employees = relationship('Person') - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - company_id = Column('company_id', Integer, - ForeignKey('companies.id')) - name = Column('name', String(50)) - discriminator = Column('type', String(50)) - primary_language = Column('primary_language', String(50)) - golf_swing = Column('golf_swing', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class Engineer(Person): - - __mapper_args__ = {'polymorphic_identity': 'engineer'} - - class Manager(Person): - - __mapper_args__ = {'polymorphic_identity': 'manager'} - - Base.metadata.create_all() - sess = create_session() - c1 = Company(name='MegaCorp, Inc.', - employees=[Engineer(name='dilbert', - primary_language='java'), Engineer(name='wally', - primary_language='c++'), Manager(name='dogbert', - golf_swing='fore!')]) - c2 = Company(name='Elbonia, Inc.', - employees=[Engineer(name='vlad', - primary_language='cobol')]) - sess.add(c1) - sess.add(c2) - sess.flush() - sess.expunge_all() - eq_(sess.query(Person).filter(Engineer.primary_language - == 'cobol').first(), Engineer(name='vlad')) - eq_(sess.query(Company).filter(Company.employees.of_type(Engineer). - any(Engineer.primary_language - == 'cobol')).first(), c2) - - def test_single_colsonsub(self): - """test single inheritance where the columns are local to their - class. - - this is a newer usage. - - """ - - class Company(Base, fixtures.ComparableEntity): - - __tablename__ = 'companies' - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column('name', String(50)) - employees = relationship('Person') - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - company_id = Column(Integer, ForeignKey('companies.id')) - name = Column(String(50)) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class Engineer(Person): - - __mapper_args__ = {'polymorphic_identity': 'engineer'} - primary_language = Column(String(50)) - - class Manager(Person): - - __mapper_args__ = {'polymorphic_identity': 'manager'} - golf_swing = Column(String(50)) - - # we have here a situation that is somewhat unique. the Person - # class is mapped to the "people" table, but it was mapped when - # the table did not include the "primary_language" or - # "golf_swing" columns. declarative will also manipulate the - # exclude_properties collection so that sibling classes don't - # cross-pollinate. - - assert Person.__table__.c.company_id is not None - assert Person.__table__.c.golf_swing is not None - assert Person.__table__.c.primary_language is not None - assert Engineer.primary_language is not None - assert Manager.golf_swing is not None - assert not hasattr(Person, 'primary_language') - assert not hasattr(Person, 'golf_swing') - assert not hasattr(Engineer, 'golf_swing') - assert not hasattr(Manager, 'primary_language') - Base.metadata.create_all() - sess = create_session() - e1 = Engineer(name='dilbert', primary_language='java') - e2 = Engineer(name='wally', primary_language='c++') - m1 = Manager(name='dogbert', golf_swing='fore!') - c1 = Company(name='MegaCorp, Inc.', employees=[e1, e2, m1]) - e3 = Engineer(name='vlad', primary_language='cobol') - c2 = Company(name='Elbonia, Inc.', employees=[e3]) - sess.add(c1) - sess.add(c2) - sess.flush() - sess.expunge_all() - eq_(sess.query(Person).filter(Engineer.primary_language - == 'cobol').first(), Engineer(name='vlad')) - eq_(sess.query(Company).filter(Company.employees.of_type(Engineer). - any(Engineer.primary_language - == 'cobol')).first(), c2) - eq_(sess.query(Engineer).filter_by(primary_language='cobol' - ).one(), Engineer(name='vlad', primary_language='cobol')) - - def test_joined_from_single(self): - - class Company(Base, fixtures.ComparableEntity): - - __tablename__ = 'companies' - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column('name', String(50)) - employees = relationship('Person') - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - company_id = Column(Integer, ForeignKey('companies.id')) - name = Column(String(50)) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class Manager(Person): - - __mapper_args__ = {'polymorphic_identity': 'manager'} - golf_swing = Column(String(50)) - - class Engineer(Person): - - __tablename__ = 'engineers' - __mapper_args__ = {'polymorphic_identity': 'engineer'} - id = Column(Integer, ForeignKey('people.id'), - primary_key=True) - primary_language = Column(String(50)) - - assert Person.__table__.c.golf_swing is not None - assert not Person.__table__.c.has_key('primary_language') - assert Engineer.__table__.c.primary_language is not None - assert Engineer.primary_language is not None - assert Manager.golf_swing is not None - assert not hasattr(Person, 'primary_language') - assert not hasattr(Person, 'golf_swing') - assert not hasattr(Engineer, 'golf_swing') - assert not hasattr(Manager, 'primary_language') - Base.metadata.create_all() - sess = create_session() - e1 = Engineer(name='dilbert', primary_language='java') - e2 = Engineer(name='wally', primary_language='c++') - m1 = Manager(name='dogbert', golf_swing='fore!') - c1 = Company(name='MegaCorp, Inc.', employees=[e1, e2, m1]) - e3 = Engineer(name='vlad', primary_language='cobol') - c2 = Company(name='Elbonia, Inc.', employees=[e3]) - sess.add(c1) - sess.add(c2) - sess.flush() - sess.expunge_all() - eq_(sess.query(Person).with_polymorphic(Engineer). - filter(Engineer.primary_language - == 'cobol').first(), Engineer(name='vlad')) - eq_(sess.query(Company).filter(Company.employees.of_type(Engineer). - any(Engineer.primary_language - == 'cobol')).first(), c2) - eq_(sess.query(Engineer).filter_by(primary_language='cobol' - ).one(), Engineer(name='vlad', primary_language='cobol')) - - def test_polymorphic_on_converted_from_inst(self): - class A(Base): - __tablename__ = 'A' - id = Column(Integer, primary_key=True) - discriminator = Column(String) - - @declared_attr - def __mapper_args__(cls): - return { - 'polymorphic_identity': cls.__name__, - 'polymorphic_on': cls.discriminator - } - - class B(A): - pass - is_(B.__mapper__.polymorphic_on, A.__table__.c.discriminator) - - def test_add_deferred(self): - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - - Person.name = deferred(Column(String(10))) - Base.metadata.create_all() - sess = create_session() - p = Person(name='ratbert') - sess.add(p) - sess.flush() - sess.expunge_all() - eq_(sess.query(Person).all(), [Person(name='ratbert')]) - sess.expunge_all() - person = sess.query(Person).filter(Person.name == 'ratbert' - ).one() - assert 'name' not in person.__dict__ - - def test_single_fksonsub(self): - """test single inheritance with a foreign key-holding column on - a subclass. - - """ - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class Engineer(Person): - - __mapper_args__ = {'polymorphic_identity': 'engineer'} - primary_language_id = Column(Integer, - ForeignKey('languages.id')) - primary_language = relationship('Language') - - class Language(Base, fixtures.ComparableEntity): - - __tablename__ = 'languages' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - - assert not hasattr(Person, 'primary_language_id') - Base.metadata.create_all() - sess = create_session() - java, cpp, cobol = Language(name='java'), Language(name='cpp'), \ - Language(name='cobol') - e1 = Engineer(name='dilbert', primary_language=java) - e2 = Engineer(name='wally', primary_language=cpp) - e3 = Engineer(name='vlad', primary_language=cobol) - sess.add_all([e1, e2, e3]) - sess.flush() - sess.expunge_all() - eq_(sess.query(Person).filter(Engineer.primary_language.has( - Language.name - == 'cobol')).first(), Engineer(name='vlad', - primary_language=Language(name='cobol'))) - eq_(sess.query(Engineer).filter(Engineer.primary_language.has( - Language.name - == 'cobol')).one(), Engineer(name='vlad', - primary_language=Language(name='cobol'))) - eq_(sess.query(Person).join(Engineer.primary_language).order_by( - Language.name).all(), - [Engineer(name='vlad', - primary_language=Language(name='cobol')), - Engineer(name='wally', primary_language=Language(name='cpp' - )), Engineer(name='dilbert', - primary_language=Language(name='java'))]) - - def test_single_three_levels(self): - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column(Integer, primary_key=True) - name = Column(String(50)) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class Engineer(Person): - - __mapper_args__ = {'polymorphic_identity': 'engineer'} - primary_language = Column(String(50)) - - class JuniorEngineer(Engineer): - - __mapper_args__ = \ - {'polymorphic_identity': 'junior_engineer'} - nerf_gun = Column(String(50)) - - class Manager(Person): - - __mapper_args__ = {'polymorphic_identity': 'manager'} - golf_swing = Column(String(50)) - - assert JuniorEngineer.nerf_gun - assert JuniorEngineer.primary_language - assert JuniorEngineer.name - assert Manager.golf_swing - assert Engineer.primary_language - assert not hasattr(Engineer, 'golf_swing') - assert not hasattr(Engineer, 'nerf_gun') - assert not hasattr(Manager, 'nerf_gun') - assert not hasattr(Manager, 'primary_language') - - def test_single_detects_conflict(self): - - class Person(Base): - - __tablename__ = 'people' - id = Column(Integer, primary_key=True) - name = Column(String(50)) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - class Engineer(Person): - - __mapper_args__ = {'polymorphic_identity': 'engineer'} - primary_language = Column(String(50)) - - # test sibling col conflict - - def go(): - - class Manager(Person): - - __mapper_args__ = {'polymorphic_identity': 'manager'} - golf_swing = Column(String(50)) - primary_language = Column(String(50)) - - assert_raises(sa.exc.ArgumentError, go) - - # test parent col conflict - - def go(): - - class Salesman(Person): - - __mapper_args__ = {'polymorphic_identity': 'manager'} - name = Column(String(50)) - - assert_raises(sa.exc.ArgumentError, go) - - def test_single_no_special_cols(self): - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column('id', Integer, primary_key=True) - name = Column('name', String(50)) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - def go(): - - class Engineer(Person): - - __mapper_args__ = {'polymorphic_identity': 'engineer'} - primary_language = Column('primary_language', - String(50)) - foo_bar = Column(Integer, primary_key=True) - - assert_raises_message(sa.exc.ArgumentError, 'place primary key' - , go) - - def test_single_no_table_args(self): - - class Person(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column('id', Integer, primary_key=True) - name = Column('name', String(50)) - discriminator = Column('type', String(50)) - __mapper_args__ = {'polymorphic_on': discriminator} - - def go(): - - class Engineer(Person): - - __mapper_args__ = {'polymorphic_identity': 'engineer'} - primary_language = Column('primary_language', - String(50)) - - # this should be on the Person class, as this is single - # table inheritance, which is why we test that this - # throws an exception! - - __table_args__ = {'mysql_engine': 'InnoDB'} - - assert_raises_message(sa.exc.ArgumentError, - 'place __table_args__', go) - - @testing.emits_warning("The classname") - def test_dupe_name_in_hierarchy(self): - class A(Base): - __tablename__ = "a" - id = Column( Integer, primary_key=True) - a_1 = A - class A(a_1): - __tablename__ = 'b' - - id = Column(Integer(),ForeignKey(a_1.id), primary_key = True) - - assert A.__mapper__.inherits is a_1.__mapper__ - -from test.orm.test_events import _RemoveListeners -class ConcreteInhTest(_RemoveListeners, DeclarativeTestBase): - def _roundtrip(self, Employee, Manager, Engineer, Boss, polymorphic=True): - Base.metadata.create_all() - sess = create_session() - e1 = Engineer(name='dilbert', primary_language='java') - e2 = Engineer(name='wally', primary_language='c++') - m1 = Manager(name='dogbert', golf_swing='fore!') - e3 = Engineer(name='vlad', primary_language='cobol') - b1 = Boss(name="pointy haired") - sess.add_all([e1, e2, m1, e3, b1]) - sess.flush() - sess.expunge_all() - if polymorphic: - eq_(sess.query(Employee).order_by(Employee.name).all(), - [Engineer(name='dilbert'), Manager(name='dogbert'), - Boss(name='pointy haired'), Engineer(name='vlad'), Engineer(name='wally')]) - else: - eq_(sess.query(Engineer).order_by(Engineer.name).all(), - [Engineer(name='dilbert'), Engineer(name='vlad'), - Engineer(name='wally')]) - eq_(sess.query(Manager).all(), [Manager(name='dogbert')]) - eq_(sess.query(Boss).all(), [Boss(name='pointy haired')]) - - - def test_explicit(self): - engineers = Table('engineers', Base.metadata, Column('id', - Integer, primary_key=True, - test_needs_autoincrement=True), Column('name' - , String(50)), Column('primary_language', - String(50))) - managers = Table('managers', Base.metadata, - Column('id',Integer, primary_key=True, test_needs_autoincrement=True), - Column('name', String(50)), - Column('golf_swing', String(50)) - ) - boss = Table('boss', Base.metadata, - Column('id',Integer, primary_key=True, test_needs_autoincrement=True), - Column('name', String(50)), - Column('golf_swing', String(50)) - ) - punion = polymorphic_union({ - 'engineer': engineers, - 'manager' : managers, - 'boss': boss}, 'type', 'punion') - - class Employee(Base, fixtures.ComparableEntity): - - __table__ = punion - __mapper_args__ = {'polymorphic_on': punion.c.type} - - class Engineer(Employee): - - __table__ = engineers - __mapper_args__ = {'polymorphic_identity': 'engineer', - 'concrete': True} - - class Manager(Employee): - - __table__ = managers - __mapper_args__ = {'polymorphic_identity': 'manager', - 'concrete': True} - - class Boss(Manager): - __table__ = boss - __mapper_args__ = {'polymorphic_identity': 'boss', - 'concrete': True} - - self._roundtrip(Employee, Manager, Engineer, Boss) - - def test_concrete_inline_non_polymorphic(self): - """test the example from the declarative docs.""" - - class Employee(Base, fixtures.ComparableEntity): - - __tablename__ = 'people' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - - class Engineer(Employee): - - __tablename__ = 'engineers' - __mapper_args__ = {'concrete': True} - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - primary_language = Column(String(50)) - name = Column(String(50)) - - class Manager(Employee): - - __tablename__ = 'manager' - __mapper_args__ = {'concrete': True} - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - golf_swing = Column(String(50)) - name = Column(String(50)) - - class Boss(Manager): - __tablename__ = 'boss' - __mapper_args__ = {'concrete': True} - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - golf_swing = Column(String(50)) - name = Column(String(50)) - - self._roundtrip(Employee, Manager, Engineer, Boss, polymorphic=False) - - def test_abstract_concrete_extension(self): - class Employee(AbstractConcreteBase, Base, fixtures.ComparableEntity): - pass - - class Manager(Employee): - __tablename__ = 'manager' - employee_id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - golf_swing = Column(String(40)) - __mapper_args__ = { - 'polymorphic_identity':'manager', - 'concrete':True} - - class Boss(Manager): - __tablename__ = 'boss' - employee_id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - golf_swing = Column(String(40)) - __mapper_args__ = { - 'polymorphic_identity':'boss', - 'concrete':True} - - class Engineer(Employee): - __tablename__ = 'engineer' - employee_id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - primary_language = Column(String(40)) - __mapper_args__ = {'polymorphic_identity':'engineer', - 'concrete':True} - - self._roundtrip(Employee, Manager, Engineer, Boss) - - def test_concrete_extension(self): - class Employee(ConcreteBase, Base, fixtures.ComparableEntity): - __tablename__ = 'employee' - employee_id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - __mapper_args__ = { - 'polymorphic_identity':'employee', - 'concrete':True} - class Manager(Employee): - __tablename__ = 'manager' - employee_id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - golf_swing = Column(String(40)) - __mapper_args__ = { - 'polymorphic_identity':'manager', - 'concrete':True} - - class Boss(Manager): - __tablename__ = 'boss' - employee_id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - golf_swing = Column(String(40)) - __mapper_args__ = { - 'polymorphic_identity':'boss', - 'concrete':True} - - class Engineer(Employee): - __tablename__ = 'engineer' - employee_id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - primary_language = Column(String(40)) - __mapper_args__ = {'polymorphic_identity':'engineer', - 'concrete':True} - self._roundtrip(Employee, Manager, Engineer, Boss) - - -def _produce_test(inline, stringbased): - - class ExplicitJoinTest(fixtures.MappedTest): - - @classmethod - def define_tables(cls, metadata): - global User, Address - Base = decl.declarative_base(metadata=metadata) - - class User(Base, fixtures.ComparableEntity): - - __tablename__ = 'users' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - name = Column(String(50)) - - class Address(Base, fixtures.ComparableEntity): - - __tablename__ = 'addresses' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - email = Column(String(50)) - user_id = Column(Integer, ForeignKey('users.id')) - if inline: - if stringbased: - user = relationship('User', - primaryjoin='User.id==Address.user_id', - backref='addresses') - else: - user = relationship(User, primaryjoin=User.id - == user_id, backref='addresses') - - if not inline: - configure_mappers() - if stringbased: - Address.user = relationship('User', - primaryjoin='User.id==Address.user_id', - backref='addresses') - else: - Address.user = relationship(User, - primaryjoin=User.id == Address.user_id, - backref='addresses') - - @classmethod - def insert_data(cls): - params = [dict(zip(('id', 'name'), column_values)) - for column_values in [(7, 'jack'), (8, 'ed'), (9, - 'fred'), (10, 'chuck')]] - User.__table__.insert().execute(params) - Address.__table__.insert().execute([dict(zip(('id', - 'user_id', 'email'), column_values)) - for column_values in [(1, 7, 'jack@bean.com'), (2, - 8, 'ed@wood.com'), (3, 8, 'ed@bettyboop.com'), (4, - 8, 'ed@lala.com'), (5, 9, 'fred@fred.com')]]) - - def test_aliased_join(self): - - # this query will screw up if the aliasing enabled in - # query.join() gets applied to the right half of the join - # condition inside the any(). the join condition inside of - # any() comes from the "primaryjoin" of the relationship, - # and should not be annotated with _orm_adapt. - # PropertyLoader.Comparator will annotate the left side with - # _orm_adapt, though. - - sess = create_session() - eq_(sess.query(User).join(User.addresses, - aliased=True).filter(Address.email == 'ed@wood.com' - ).filter(User.addresses.any(Address.email - == 'jack@bean.com')).all(), []) - - ExplicitJoinTest.__name__ = 'ExplicitJoinTest%s%s' % (inline - and 'Inline' or 'Separate', stringbased and 'String' - or 'Literal') - return ExplicitJoinTest - -for inline in True, False: - for stringbased in True, False: - testclass = _produce_test(inline, stringbased) - exec '%s = testclass' % testclass.__name__ - del testclass - -class DeclarativeReflectionTest(fixtures.TestBase): - - @classmethod - def setup_class(cls): - global reflection_metadata - reflection_metadata = MetaData(testing.db) - Table('users', reflection_metadata, Column('id', Integer, - primary_key=True, test_needs_autoincrement=True), - Column('name', String(50)), test_needs_fk=True) - Table( - 'addresses', - reflection_metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('email', String(50)), - Column('user_id', Integer, ForeignKey('users.id')), - test_needs_fk=True, - ) - Table( - 'imhandles', - reflection_metadata, - Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('user_id', Integer), - Column('network', String(50)), - Column('handle', String(50)), - test_needs_fk=True, - ) - reflection_metadata.create_all() - - def setup(self): - global Base - Base = decl.declarative_base(testing.db) - - def teardown(self): - for t in reversed(reflection_metadata.sorted_tables): - t.delete().execute() - - @classmethod - def teardown_class(cls): - reflection_metadata.drop_all() - - def test_basic(self): - meta = MetaData(testing.db) - - class User(Base, fixtures.ComparableEntity): - - __tablename__ = 'users' - __autoload__ = True - if testing.against('oracle', 'firebird'): - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - addresses = relationship('Address', backref='user') - - class Address(Base, fixtures.ComparableEntity): - - __tablename__ = 'addresses' - __autoload__ = True - if testing.against('oracle', 'firebird'): - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - - u1 = User(name='u1', addresses=[Address(email='one'), - Address(email='two')]) - sess = create_session() - sess.add(u1) - sess.flush() - sess.expunge_all() - eq_(sess.query(User).all(), [User(name='u1', - addresses=[Address(email='one'), Address(email='two')])]) - a1 = sess.query(Address).filter(Address.email == 'two').one() - eq_(a1, Address(email='two')) - eq_(a1.user, User(name='u1')) - - def test_rekey(self): - meta = MetaData(testing.db) - - class User(Base, fixtures.ComparableEntity): - - __tablename__ = 'users' - __autoload__ = True - if testing.against('oracle', 'firebird'): - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - nom = Column('name', String(50), key='nom') - addresses = relationship('Address', backref='user') - - class Address(Base, fixtures.ComparableEntity): - - __tablename__ = 'addresses' - __autoload__ = True - if testing.against('oracle', 'firebird'): - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - - u1 = User(nom='u1', addresses=[Address(email='one'), - Address(email='two')]) - sess = create_session() - sess.add(u1) - sess.flush() - sess.expunge_all() - eq_(sess.query(User).all(), [User(nom='u1', - addresses=[Address(email='one'), Address(email='two')])]) - a1 = sess.query(Address).filter(Address.email == 'two').one() - eq_(a1, Address(email='two')) - eq_(a1.user, User(nom='u1')) - assert_raises(TypeError, User, name='u3') - - def test_supplied_fk(self): - meta = MetaData(testing.db) - - class IMHandle(Base, fixtures.ComparableEntity): - - __tablename__ = 'imhandles' - __autoload__ = True - if testing.against('oracle', 'firebird'): - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - user_id = Column('user_id', Integer, ForeignKey('users.id')) - - class User(Base, fixtures.ComparableEntity): - - __tablename__ = 'users' - __autoload__ = True - if testing.against('oracle', 'firebird'): - id = Column('id', Integer, primary_key=True, - test_needs_autoincrement=True) - handles = relationship('IMHandle', backref='user') - - u1 = User(name='u1', handles=[IMHandle(network='blabber', - handle='foo'), IMHandle(network='lol', handle='zomg' - )]) - sess = create_session() - sess.add(u1) - sess.flush() - sess.expunge_all() - eq_(sess.query(User).all(), [User(name='u1', - handles=[IMHandle(network='blabber', handle='foo'), - IMHandle(network='lol', handle='zomg')])]) - a1 = sess.query(IMHandle).filter(IMHandle.handle == 'zomg' - ).one() - eq_(a1, IMHandle(network='lol', handle='zomg')) - eq_(a1.user, User(name='u1')) - -class DeclarativeMixinTest(DeclarativeTestBase): - - def test_simple(self): - - class MyMixin(object): - - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - - def foo(self): - return 'bar' + str(self.id) - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - name = Column(String(100), nullable=False, index=True) - - Base.metadata.create_all() - session = create_session() - session.add(MyModel(name='testing')) - session.flush() - session.expunge_all() - obj = session.query(MyModel).one() - eq_(obj.id, 1) - eq_(obj.name, 'testing') - eq_(obj.foo(), 'bar1') - - def test_unique_column(self): - - class MyMixin(object): - - id = Column(Integer, primary_key=True) - value = Column(String, unique=True) - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - - assert MyModel.__table__.c.value.unique - - def test_hierarchical_bases(self): - - class MyMixinParent: - - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - - def foo(self): - return 'bar' + str(self.id) - - class MyMixin(MyMixinParent): - - baz = Column(String(100), nullable=False, index=True) - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - name = Column(String(100), nullable=False, index=True) - - Base.metadata.create_all() - session = create_session() - session.add(MyModel(name='testing', baz='fu')) - session.flush() - session.expunge_all() - obj = session.query(MyModel).one() - eq_(obj.id, 1) - eq_(obj.name, 'testing') - eq_(obj.foo(), 'bar1') - eq_(obj.baz, 'fu') - - def test_mixin_overrides(self): - """test a mixin that overrides a column on a superclass.""" - - class MixinA(object): - foo = Column(String(50)) - - class MixinB(MixinA): - foo = Column(Integer) - - class MyModelA(Base, MixinA): - __tablename__ = 'testa' - id = Column(Integer, primary_key=True) - - class MyModelB(Base, MixinB): - __tablename__ = 'testb' - id = Column(Integer, primary_key=True) - - eq_(MyModelA.__table__.c.foo.type.__class__, String) - eq_(MyModelB.__table__.c.foo.type.__class__, Integer) - - - def test_not_allowed(self): - - class MyMixin: - foo = Column(Integer, ForeignKey('bar.id')) - - def go(): - class MyModel(Base, MyMixin): - __tablename__ = 'foo' - - assert_raises(sa.exc.InvalidRequestError, go) - - class MyRelMixin: - foo = relationship('Bar') - - def go(): - class MyModel(Base, MyRelMixin): - - __tablename__ = 'foo' - - assert_raises(sa.exc.InvalidRequestError, go) - - class MyDefMixin: - foo = deferred(Column('foo', String)) - - def go(): - class MyModel(Base, MyDefMixin): - __tablename__ = 'foo' - - assert_raises(sa.exc.InvalidRequestError, go) - - class MyCPropMixin: - foo = column_property(Column('foo', String)) - - def go(): - class MyModel(Base, MyCPropMixin): - __tablename__ = 'foo' - - assert_raises(sa.exc.InvalidRequestError, go) - - def test_table_name_inherited(self): - - class MyMixin: - @declared_attr - def __tablename__(cls): - return cls.__name__.lower() - id = Column(Integer, primary_key=True) - - class MyModel(Base, MyMixin): - pass - - eq_(MyModel.__table__.name, 'mymodel') - - def test_classproperty_still_works(self): - class MyMixin(object): - @classproperty - def __tablename__(cls): - return cls.__name__.lower() - id = Column(Integer, primary_key=True) - - class MyModel(Base, MyMixin): - __tablename__ = 'overridden' - - eq_(MyModel.__table__.name, 'overridden') - - def test_table_name_not_inherited(self): - - class MyMixin: - @declared_attr - def __tablename__(cls): - return cls.__name__.lower() - id = Column(Integer, primary_key=True) - - class MyModel(Base, MyMixin): - __tablename__ = 'overridden' - - eq_(MyModel.__table__.name, 'overridden') - - def test_table_name_inheritance_order(self): - - class MyMixin1: - @declared_attr - def __tablename__(cls): - return cls.__name__.lower() + '1' - - class MyMixin2: - @declared_attr - def __tablename__(cls): - return cls.__name__.lower() + '2' - - class MyModel(Base, MyMixin1, MyMixin2): - id = Column(Integer, primary_key=True) - - eq_(MyModel.__table__.name, 'mymodel1') - - def test_table_name_dependent_on_subclass(self): - - class MyHistoryMixin: - @declared_attr - def __tablename__(cls): - return cls.parent_name + '_changelog' - - class MyModel(Base, MyHistoryMixin): - parent_name = 'foo' - id = Column(Integer, primary_key=True) - - eq_(MyModel.__table__.name, 'foo_changelog') - - def test_table_args_inherited(self): - - class MyMixin: - __table_args__ = {'mysql_engine': 'InnoDB'} - - class MyModel(Base, MyMixin): - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - - eq_(MyModel.__table__.kwargs, {'mysql_engine': 'InnoDB'}) - - def test_table_args_inherited_descriptor(self): - - class MyMixin: - @declared_attr - def __table_args__(cls): - return {'info': cls.__name__} - - class MyModel(Base, MyMixin): - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - - eq_(MyModel.__table__.info, 'MyModel') - - def test_table_args_inherited_single_table_inheritance(self): - - class MyMixin: - __table_args__ = {'mysql_engine': 'InnoDB'} - - class General(Base, MyMixin): - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - type_ = Column(String(50)) - __mapper__args = {'polymorphic_on': type_} - - class Specific(General): - __mapper_args__ = {'polymorphic_identity': 'specific'} - - assert Specific.__table__ is General.__table__ - eq_(General.__table__.kwargs, {'mysql_engine': 'InnoDB'}) - - def test_columns_single_table_inheritance(self): - """Test a column on a mixin with an alternate attribute name, - mapped to a superclass and single-table inheritance subclass. - The superclass table gets the column, the subclass shares - the MapperProperty. - - """ - - class MyMixin(object): - foo = Column('foo', Integer) - bar = Column('bar_newname', Integer) - - class General(Base, MyMixin): - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - type_ = Column(String(50)) - __mapper__args = {'polymorphic_on': type_} - - class Specific(General): - __mapper_args__ = {'polymorphic_identity': 'specific'} - - assert General.bar.prop.columns[0] is General.__table__.c.bar_newname - assert len(General.bar.prop.columns) == 1 - assert Specific.bar.prop is General.bar.prop - - def test_columns_joined_table_inheritance(self): - """Test a column on a mixin with an alternate attribute name, - mapped to a superclass and joined-table inheritance subclass. - Both tables get the column, in the case of the subclass the two - columns are joined under one MapperProperty. - - """ - - class MyMixin(object): - foo = Column('foo', Integer) - bar = Column('bar_newname', Integer) - - class General(Base, MyMixin): - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - type_ = Column(String(50)) - __mapper__args = {'polymorphic_on': type_} - - class Specific(General): - __tablename__ = 'sub' - id = Column(Integer, ForeignKey('test.id'), primary_key=True) - __mapper_args__ = {'polymorphic_identity': 'specific'} - - assert General.bar.prop.columns[0] is General.__table__.c.bar_newname - assert len(General.bar.prop.columns) == 1 - assert Specific.bar.prop is not General.bar.prop - assert len(Specific.bar.prop.columns) == 2 - assert Specific.bar.prop.columns[0] is General.__table__.c.bar_newname - assert Specific.bar.prop.columns[1] is Specific.__table__.c.bar_newname - - def test_column_join_checks_superclass_type(self): - """Test that the logic which joins subclass props to those - of the superclass checks that the superclass property is a column. - - """ - - class General(Base): - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - general_id = Column(Integer, ForeignKey('test.id')) - type_ = relationship("General") - - class Specific(General): - __tablename__ = 'sub' - id = Column(Integer, ForeignKey('test.id'), primary_key=True) - type_ = Column('foob', String(50)) - - assert isinstance(General.type_.property, sa.orm.RelationshipProperty) - assert Specific.type_.property.columns[0] is Specific.__table__.c.foob - - def test_column_join_checks_subclass_type(self): - """Test that the logic which joins subclass props to those - of the superclass checks that the subclass property is a column. - - """ - - def go(): - class General(Base): - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - type_ = Column('foob', Integer) - - class Specific(General): - __tablename__ = 'sub' - id = Column(Integer, ForeignKey('test.id'), primary_key=True) - specific_id = Column(Integer, ForeignKey('sub.id')) - type_ = relationship("Specific") - assert_raises_message( - sa.exc.ArgumentError, "column 'foob' conflicts with property", go - ) - - def test_table_args_overridden(self): - - class MyMixin: - __table_args__ = {'mysql_engine': 'Foo'} - - class MyModel(Base, MyMixin): - __tablename__ = 'test' - __table_args__ = {'mysql_engine': 'InnoDB'} - id = Column(Integer, primary_key=True) - - eq_(MyModel.__table__.kwargs, {'mysql_engine': 'InnoDB'}) - - def test_mapper_args_declared_attr(self): - - class ComputedMapperArgs: - @declared_attr - def __mapper_args__(cls): - if cls.__name__ == 'Person': - return {'polymorphic_on': cls.discriminator} - else: - return {'polymorphic_identity': cls.__name__} - - class Person(Base, ComputedMapperArgs): - __tablename__ = 'people' - id = Column(Integer, primary_key=True) - discriminator = Column('type', String(50)) - - class Engineer(Person): - pass - - configure_mappers() - assert class_mapper(Person).polymorphic_on \ - is Person.__table__.c.type - eq_(class_mapper(Engineer).polymorphic_identity, 'Engineer') - - def test_mapper_args_declared_attr_two(self): - - # same as test_mapper_args_declared_attr, but we repeat - # ComputedMapperArgs on both classes for no apparent reason. - - class ComputedMapperArgs: - @declared_attr - def __mapper_args__(cls): - if cls.__name__ == 'Person': - return {'polymorphic_on': cls.discriminator} - else: - return {'polymorphic_identity': cls.__name__} - - class Person(Base, ComputedMapperArgs): - - __tablename__ = 'people' - id = Column(Integer, primary_key=True) - discriminator = Column('type', String(50)) - - class Engineer(Person, ComputedMapperArgs): - pass - - configure_mappers() - assert class_mapper(Person).polymorphic_on \ - is Person.__table__.c.type - eq_(class_mapper(Engineer).polymorphic_identity, 'Engineer') - - def test_table_args_composite(self): - - class MyMixin1: - - __table_args__ = {'info': {'baz': 'bob'}} - - class MyMixin2: - - __table_args__ = {'info': {'foo': 'bar'}} - - class MyModel(Base, MyMixin1, MyMixin2): - - __tablename__ = 'test' - - @declared_attr - def __table_args__(self): - info = {} - args = dict(info=info) - info.update(MyMixin1.__table_args__['info']) - info.update(MyMixin2.__table_args__['info']) - return args - id = Column(Integer, primary_key=True) - - eq_(MyModel.__table__.info, {'foo': 'bar', 'baz': 'bob'}) - - def test_mapper_args_inherited(self): - - class MyMixin: - - __mapper_args__ = {'always_refresh': True} - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - - eq_(MyModel.__mapper__.always_refresh, True) - - def test_mapper_args_inherited_descriptor(self): - - class MyMixin: - - @declared_attr - def __mapper_args__(cls): - - # tenuous, but illustrates the problem! - - if cls.__name__ == 'MyModel': - return dict(always_refresh=True) - else: - return dict(always_refresh=False) - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - - eq_(MyModel.__mapper__.always_refresh, True) - - def test_mapper_args_polymorphic_on_inherited(self): - - class MyMixin: - - type_ = Column(String(50)) - __mapper_args__ = {'polymorphic_on': type_} - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - - col = MyModel.__mapper__.polymorphic_on - eq_(col.name, 'type_') - assert col.table is not None - - def test_mapper_args_overridden(self): - - class MyMixin: - - __mapper_args__ = dict(always_refresh=True) - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - __mapper_args__ = dict(always_refresh=False) - id = Column(Integer, primary_key=True) - - eq_(MyModel.__mapper__.always_refresh, False) - - def test_mapper_args_composite(self): - - class MyMixin1: - - type_ = Column(String(50)) - __mapper_args__ = {'polymorphic_on': type_} - - class MyMixin2: - - __mapper_args__ = {'always_refresh': True} - - class MyModel(Base, MyMixin1, MyMixin2): - - __tablename__ = 'test' - - @declared_attr - def __mapper_args__(cls): - args = {} - args.update(MyMixin1.__mapper_args__) - args.update(MyMixin2.__mapper_args__) - if cls.__name__ != 'MyModel': - args.pop('polymorphic_on') - args['polymorphic_identity'] = cls.__name__ - - return args - id = Column(Integer, primary_key=True) - - class MySubModel(MyModel): - pass - - eq_( - MyModel.__mapper__.polymorphic_on.name, - 'type_' - ) - assert MyModel.__mapper__.polymorphic_on.table is not None - eq_(MyModel.__mapper__.always_refresh, True) - eq_(MySubModel.__mapper__.always_refresh, True) - eq_(MySubModel.__mapper__.polymorphic_identity, 'MySubModel') - - def test_mapper_args_property(self): - class MyModel(Base): - - @declared_attr - def __tablename__(cls): - return cls.__name__.lower() - - @declared_attr - def __table_args__(cls): - return {'mysql_engine':'InnoDB'} - - @declared_attr - def __mapper_args__(cls): - args = {} - args['polymorphic_identity'] = cls.__name__ - return args - id = Column(Integer, primary_key=True) - - class MySubModel(MyModel): - id = Column(Integer, ForeignKey('mymodel.id'), primary_key=True) - - class MySubModel2(MyModel): - __tablename__ = 'sometable' - id = Column(Integer, ForeignKey('mymodel.id'), primary_key=True) - - eq_(MyModel.__mapper__.polymorphic_identity, 'MyModel') - eq_(MySubModel.__mapper__.polymorphic_identity, 'MySubModel') - eq_(MyModel.__table__.kwargs['mysql_engine'], 'InnoDB') - eq_(MySubModel.__table__.kwargs['mysql_engine'], 'InnoDB') - eq_(MySubModel2.__table__.kwargs['mysql_engine'], 'InnoDB') - eq_(MyModel.__table__.name, 'mymodel') - eq_(MySubModel.__table__.name, 'mysubmodel') - - def test_mapper_args_custom_base(self): - """test the @declared_attr approach from a custom base.""" - - class Base(object): - @declared_attr - def __tablename__(cls): - return cls.__name__.lower() - - @declared_attr - def __table_args__(cls): - return {'mysql_engine':'InnoDB'} - - @declared_attr - def id(self): - return Column(Integer, primary_key=True) - - Base = decl.declarative_base(cls=Base) - - class MyClass(Base): - pass - - class MyOtherClass(Base): - pass - - eq_(MyClass.__table__.kwargs['mysql_engine'], 'InnoDB') - eq_(MyClass.__table__.name, 'myclass') - eq_(MyOtherClass.__table__.name, 'myotherclass') - assert MyClass.__table__.c.id.table is MyClass.__table__ - assert MyOtherClass.__table__.c.id.table is MyOtherClass.__table__ - - def test_single_table_no_propagation(self): - - class IdColumn: - - id = Column(Integer, primary_key=True) - - class Generic(Base, IdColumn): - - __tablename__ = 'base' - discriminator = Column('type', String(50)) - __mapper_args__ = dict(polymorphic_on=discriminator) - value = Column(Integer()) - - class Specific(Generic): - - __mapper_args__ = dict(polymorphic_identity='specific') - - assert Specific.__table__ is Generic.__table__ - eq_(Generic.__table__.c.keys(), ['id', 'type', 'value']) - assert class_mapper(Specific).polymorphic_on \ - is Generic.__table__.c.type - eq_(class_mapper(Specific).polymorphic_identity, 'specific') - - def test_joined_table_propagation(self): - - class CommonMixin: - - @declared_attr - def __tablename__(cls): - return cls.__name__.lower() - __table_args__ = {'mysql_engine': 'InnoDB'} - timestamp = Column(Integer) - id = Column(Integer, primary_key=True) - - class Generic(Base, CommonMixin): - - discriminator = Column('python_type', String(50)) - __mapper_args__ = dict(polymorphic_on=discriminator) - - class Specific(Generic): - - __mapper_args__ = dict(polymorphic_identity='specific') - id = Column(Integer, ForeignKey('generic.id'), - primary_key=True) - - eq_(Generic.__table__.name, 'generic') - eq_(Specific.__table__.name, 'specific') - eq_(Generic.__table__.c.keys(), ['timestamp', 'id', - 'python_type']) - eq_(Specific.__table__.c.keys(), ['timestamp', 'id']) - eq_(Generic.__table__.kwargs, {'mysql_engine': 'InnoDB'}) - eq_(Specific.__table__.kwargs, {'mysql_engine': 'InnoDB'}) - - def test_some_propagation(self): - - class CommonMixin: - - @declared_attr - def __tablename__(cls): - return cls.__name__.lower() - __table_args__ = {'mysql_engine': 'InnoDB'} - timestamp = Column(Integer) - - class BaseType(Base, CommonMixin): - - discriminator = Column('type', String(50)) - __mapper_args__ = dict(polymorphic_on=discriminator) - id = Column(Integer, primary_key=True) - value = Column(Integer()) - - class Single(BaseType): - - __tablename__ = None - __mapper_args__ = dict(polymorphic_identity='type1') - - class Joined(BaseType): - - __mapper_args__ = dict(polymorphic_identity='type2') - id = Column(Integer, ForeignKey('basetype.id'), - primary_key=True) - - eq_(BaseType.__table__.name, 'basetype') - eq_(BaseType.__table__.c.keys(), ['timestamp', 'type', 'id', - 'value']) - eq_(BaseType.__table__.kwargs, {'mysql_engine': 'InnoDB'}) - assert Single.__table__ is BaseType.__table__ - eq_(Joined.__table__.name, 'joined') - eq_(Joined.__table__.c.keys(), ['timestamp', 'id']) - eq_(Joined.__table__.kwargs, {'mysql_engine': 'InnoDB'}) - - def test_non_propagating_mixin(self): - - class NoJoinedTableNameMixin: - - @declared_attr - def __tablename__(cls): - if decl.has_inherited_table(cls): - return None - return cls.__name__.lower() - - class BaseType(Base, NoJoinedTableNameMixin): - - discriminator = Column('type', String(50)) - __mapper_args__ = dict(polymorphic_on=discriminator) - id = Column(Integer, primary_key=True) - value = Column(Integer()) - - class Specific(BaseType): - - __mapper_args__ = dict(polymorphic_identity='specific') - - eq_(BaseType.__table__.name, 'basetype') - eq_(BaseType.__table__.c.keys(), ['type', 'id', 'value']) - assert Specific.__table__ is BaseType.__table__ - assert class_mapper(Specific).polymorphic_on \ - is BaseType.__table__.c.type - eq_(class_mapper(Specific).polymorphic_identity, 'specific') - - def test_non_propagating_mixin_used_for_joined(self): - - class TableNameMixin: - - @declared_attr - def __tablename__(cls): - if decl.has_inherited_table(cls) and TableNameMixin \ - not in cls.__bases__: - return None - return cls.__name__.lower() - - class BaseType(Base, TableNameMixin): - - discriminator = Column('type', String(50)) - __mapper_args__ = dict(polymorphic_on=discriminator) - id = Column(Integer, primary_key=True) - value = Column(Integer()) - - class Specific(BaseType, TableNameMixin): - - __mapper_args__ = dict(polymorphic_identity='specific') - id = Column(Integer, ForeignKey('basetype.id'), - primary_key=True) - - eq_(BaseType.__table__.name, 'basetype') - eq_(BaseType.__table__.c.keys(), ['type', 'id', 'value']) - eq_(Specific.__table__.name, 'specific') - eq_(Specific.__table__.c.keys(), ['id']) - - def test_single_back_propagate(self): - - class ColumnMixin: - - timestamp = Column(Integer) - - class BaseType(Base): - - __tablename__ = 'foo' - discriminator = Column('type', String(50)) - __mapper_args__ = dict(polymorphic_on=discriminator) - id = Column(Integer, primary_key=True) - - class Specific(BaseType, ColumnMixin): - - __mapper_args__ = dict(polymorphic_identity='specific') - - eq_(BaseType.__table__.c.keys(), ['type', 'id', 'timestamp']) - - def test_table_in_model_and_same_column_in_mixin(self): - - class ColumnMixin: - - data = Column(Integer) - - class Model(Base, ColumnMixin): - - __table__ = Table('foo', Base.metadata, Column('data', - Integer), Column('id', Integer, - primary_key=True)) - - model_col = Model.__table__.c.data - mixin_col = ColumnMixin.data - assert model_col is not mixin_col - eq_(model_col.name, 'data') - assert model_col.type.__class__ is mixin_col.type.__class__ - - def test_table_in_model_and_different_named_column_in_mixin(self): - - class ColumnMixin: - tada = Column(Integer) - - def go(): - - class Model(Base, ColumnMixin): - - __table__ = Table('foo', Base.metadata, - Column('data',Integer), - Column('id', Integer,primary_key=True)) - foo = relationship("Dest") - - assert_raises_message(sa.exc.ArgumentError, - "Can't add additional column 'tada' when " - "specifying __table__", go) - - def test_table_in_model_and_different_named_alt_key_column_in_mixin(self): - - # here, the __table__ has a column 'tada'. We disallow - # the add of the 'foobar' column, even though it's - # keyed to 'tada'. - - class ColumnMixin: - tada = Column('foobar', Integer) - - def go(): - - class Model(Base, ColumnMixin): - - __table__ = Table('foo', Base.metadata, - Column('data',Integer), - Column('tada', Integer), - Column('id', Integer,primary_key=True)) - foo = relationship("Dest") - - assert_raises_message(sa.exc.ArgumentError, - "Can't add additional column 'foobar' when " - "specifying __table__", go) - - def test_table_in_model_overrides_different_typed_column_in_mixin(self): - - class ColumnMixin: - - data = Column(String) - - class Model(Base, ColumnMixin): - - __table__ = Table('foo', Base.metadata, Column('data', - Integer), Column('id', Integer, - primary_key=True)) - - model_col = Model.__table__.c.data - mixin_col = ColumnMixin.data - assert model_col is not mixin_col - eq_(model_col.name, 'data') - assert model_col.type.__class__ is Integer - - def test_mixin_column_ordering(self): - - class Foo(object): - - col1 = Column(Integer) - col3 = Column(Integer) - - class Bar(object): - - col2 = Column(Integer) - col4 = Column(Integer) - - class Model(Base, Foo, Bar): - - id = Column(Integer, primary_key=True) - __tablename__ = 'model' - - eq_(Model.__table__.c.keys(), ['col1', 'col3', 'col2', 'col4', - 'id']) - - def test_honor_class_mro_one(self): - class HasXMixin(object): - @declared_attr - def x(self): - return Column(Integer) - - class Parent(HasXMixin, Base): - __tablename__ = 'parent' - id = Column(Integer, primary_key=True) - - class Child(Parent): - __tablename__ = 'child' - id = Column(Integer, ForeignKey('parent.id'), primary_key=True) - - assert "x" not in Child.__table__.c - - def test_honor_class_mro_two(self): - class HasXMixin(object): - @declared_attr - def x(self): - return Column(Integer) - - class Parent(HasXMixin, Base): - __tablename__ = 'parent' - id = Column(Integer, primary_key=True) - def x(self): - return "hi" - - class C(Parent): - __tablename__ = 'c' - id = Column(Integer, ForeignKey('parent.id'), primary_key=True) - - assert C().x() == 'hi' - - -class DeclarativeMixinPropertyTest(DeclarativeTestBase): - - def test_column_property(self): - - class MyMixin(object): - - @declared_attr - def prop_hoho(cls): - return column_property(Column('prop', String(50))) - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - - class MyOtherModel(Base, MyMixin): - - __tablename__ = 'othertest' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - - assert MyModel.__table__.c.prop is not None - assert MyOtherModel.__table__.c.prop is not None - assert MyModel.__table__.c.prop \ - is not MyOtherModel.__table__.c.prop - assert MyModel.prop_hoho.property.columns \ - == [MyModel.__table__.c.prop] - assert MyOtherModel.prop_hoho.property.columns \ - == [MyOtherModel.__table__.c.prop] - assert MyModel.prop_hoho.property \ - is not MyOtherModel.prop_hoho.property - Base.metadata.create_all() - sess = create_session() - m1, m2 = MyModel(prop_hoho='foo'), MyOtherModel(prop_hoho='bar') - sess.add_all([m1, m2]) - sess.flush() - eq_(sess.query(MyModel).filter(MyModel.prop_hoho == 'foo' - ).one(), m1) - eq_(sess.query(MyOtherModel).filter(MyOtherModel.prop_hoho - == 'bar').one(), m2) - - def test_doc(self): - """test documentation transfer. - - the documentation situation with @declared_attr is problematic. - at least see if mapped subclasses get the doc. - - """ - - class MyMixin(object): - - @declared_attr - def type_(cls): - """this is a document.""" - - return Column(String(50)) - - @declared_attr - def t2(cls): - """this is another document.""" - - return column_property(Column(String(50))) - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - - configure_mappers() - eq_(MyModel.type_.__doc__, """this is a document.""") - eq_(MyModel.t2.__doc__, """this is another document.""") - - def test_column_in_mapper_args(self): - - class MyMixin(object): - - @declared_attr - def type_(cls): - return Column(String(50)) - __mapper_args__ = {'polymorphic_on': type_} - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - id = Column(Integer, primary_key=True) - - configure_mappers() - col = MyModel.__mapper__.polymorphic_on - eq_(col.name, 'type_') - assert col.table is not None - - def test_deferred(self): - - class MyMixin(object): - - @declared_attr - def data(cls): - return deferred(Column('data', String(50))) - - class MyModel(Base, MyMixin): - - __tablename__ = 'test' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) - - Base.metadata.create_all() - sess = create_session() - sess.add_all([MyModel(data='d1'), MyModel(data='d2')]) - sess.flush() - sess.expunge_all() - d1, d2 = sess.query(MyModel).order_by(MyModel.data) - assert 'data' not in d1.__dict__ - assert d1.data == 'd1' - assert 'data' in d1.__dict__ - - def _test_relationship(self, usestring): +def _produce_test(inline, stringbased): - class RefTargetMixin(object): + class ExplicitJoinTest(fixtures.MappedTest): - @declared_attr - def target_id(cls): - return Column('target_id', ForeignKey('target.id')) - if usestring: + @classmethod + def define_tables(cls, metadata): + global User, Address + Base = decl.declarative_base(metadata=metadata) - @declared_attr - def target(cls): - return relationship('Target', - primaryjoin='Target.id==%s.target_id' - % cls.__name__) - else: + class User(Base, fixtures.ComparableEntity): - @declared_attr - def target(cls): - return relationship('Target') + __tablename__ = 'users' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) - class Foo(Base, RefTargetMixin): + class Address(Base, fixtures.ComparableEntity): - __tablename__ = 'foo' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) + __tablename__ = 'addresses' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + email = Column(String(50)) + user_id = Column(Integer, ForeignKey('users.id')) + if inline: + if stringbased: + user = relationship('User', + primaryjoin='User.id==Address.user_id', + backref='addresses') + else: + user = relationship(User, primaryjoin=User.id + == user_id, backref='addresses') - class Bar(Base, RefTargetMixin): + if not inline: + configure_mappers() + if stringbased: + Address.user = relationship('User', + primaryjoin='User.id==Address.user_id', + backref='addresses') + else: + Address.user = relationship(User, + primaryjoin=User.id == Address.user_id, + backref='addresses') - __tablename__ = 'bar' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) + @classmethod + def insert_data(cls): + params = [dict(zip(('id', 'name'), column_values)) + for column_values in [(7, 'jack'), (8, 'ed'), (9, + 'fred'), (10, 'chuck')]] + User.__table__.insert().execute(params) + Address.__table__.insert().execute([dict(zip(('id', + 'user_id', 'email'), column_values)) + for column_values in [(1, 7, 'jack@bean.com'), (2, + 8, 'ed@wood.com'), (3, 8, 'ed@bettyboop.com'), (4, + 8, 'ed@lala.com'), (5, 9, 'fred@fred.com')]]) - class Target(Base): + def test_aliased_join(self): - __tablename__ = 'target' - id = Column(Integer, primary_key=True, - test_needs_autoincrement=True) + # this query will screw up if the aliasing enabled in + # query.join() gets applied to the right half of the join + # condition inside the any(). the join condition inside of + # any() comes from the "primaryjoin" of the relationship, + # and should not be annotated with _orm_adapt. + # PropertyLoader.Comparator will annotate the left side with + # _orm_adapt, though. - Base.metadata.create_all() - sess = create_session() - t1, t2 = Target(), Target() - f1, f2, b1 = Foo(target=t1), Foo(target=t2), Bar(target=t1) - sess.add_all([f1, f2, b1]) - sess.flush() - eq_(sess.query(Foo).filter(Foo.target == t2).one(), f2) - eq_(sess.query(Bar).filter(Bar.target == t2).first(), None) - sess.expire_all() - eq_(f1.target, t1) + sess = create_session() + eq_(sess.query(User).join(User.addresses, + aliased=True).filter(Address.email == 'ed@wood.com' + ).filter(User.addresses.any(Address.email + == 'jack@bean.com')).all(), []) - def test_relationship(self): - self._test_relationship(False) + ExplicitJoinTest.__name__ = 'ExplicitJoinTest%s%s' % (inline + and 'Inline' or 'Separate', stringbased and 'String' + or 'Literal') + return ExplicitJoinTest - def test_relationship_primryjoin(self): - self._test_relationship(True) +for inline in True, False: + for stringbased in True, False: + testclass = _produce_test(inline, stringbased) + exec '%s = testclass' % testclass.__name__ + del testclass diff --git a/test/ext/test_declarative_inheritance.py b/test/ext/test_declarative_inheritance.py new file mode 100644 index 0000000000..e2a8dcc9a0 --- /dev/null +++ b/test/ext/test_declarative_inheritance.py @@ -0,0 +1,988 @@ + +from test.lib.testing import eq_, assert_raises, \ + assert_raises_message, is_ +from sqlalchemy.ext import declarative as decl +from sqlalchemy import exc +import sqlalchemy as sa +from test.lib import testing +from sqlalchemy import MetaData, Integer, String, ForeignKey, \ + ForeignKeyConstraint, Index +from test.lib.schema import Table, Column +from sqlalchemy.orm import relationship, create_session, class_mapper, \ + joinedload, configure_mappers, backref, clear_mappers, \ + polymorphic_union, deferred, column_property, composite,\ + Session +from test.lib.testing import eq_ +from sqlalchemy.util import classproperty +from sqlalchemy.ext.declarative import declared_attr, AbstractConcreteBase, ConcreteBase +from test.lib import fixtures + +class DeclarativeTestBase(fixtures.TestBase, testing.AssertsExecutionResults): + def setup(self): + global Base + Base = decl.declarative_base(testing.db) + + def teardown(self): + Session.close_all() + clear_mappers() + Base.metadata.drop_all() + +class DeclarativeInheritanceTest(DeclarativeTestBase): + + def test_we_must_copy_mapper_args(self): + + class Person(Base): + + __tablename__ = 'people' + id = Column(Integer, primary_key=True) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator, + 'polymorphic_identity': 'person'} + + class Engineer(Person): + + primary_language = Column(String(50)) + + assert 'inherits' not in Person.__mapper_args__ + assert class_mapper(Engineer).polymorphic_identity is None + assert class_mapper(Engineer).polymorphic_on is Person.__table__.c.type + + def test_we_must_only_copy_column_mapper_args(self): + + class Person(Base): + + __tablename__ = 'people' + id = Column(Integer, primary_key=True) + a=Column(Integer) + b=Column(Integer) + c=Column(Integer) + d=Column(Integer) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator, + 'polymorphic_identity': 'person', + 'version_id_col': 'a', + 'column_prefix': 'bar', + 'include_properties': ['id', 'a', 'b'], + } + assert class_mapper(Person).version_id_col == 'a' + assert class_mapper(Person).include_properties == set(['id', 'a', 'b']) + + + def test_custom_join_condition(self): + + class Foo(Base): + + __tablename__ = 'foo' + id = Column('id', Integer, primary_key=True) + + class Bar(Foo): + + __tablename__ = 'bar' + id = Column('id', Integer, primary_key=True) + foo_id = Column('foo_id', Integer) + __mapper_args__ = {'inherit_condition': foo_id == Foo.id} + + # compile succeeds because inherit_condition is honored + + configure_mappers() + + def test_joined(self): + + class Company(Base, fixtures.ComparableEntity): + + __tablename__ = 'companies' + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column('name', String(50)) + employees = relationship('Person') + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + company_id = Column('company_id', Integer, + ForeignKey('companies.id')) + name = Column('name', String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Engineer(Person): + + __tablename__ = 'engineers' + __mapper_args__ = {'polymorphic_identity': 'engineer'} + id = Column('id', Integer, ForeignKey('people.id'), + primary_key=True) + primary_language = Column('primary_language', String(50)) + + class Manager(Person): + + __tablename__ = 'managers' + __mapper_args__ = {'polymorphic_identity': 'manager'} + id = Column('id', Integer, ForeignKey('people.id'), + primary_key=True) + golf_swing = Column('golf_swing', String(50)) + + Base.metadata.create_all() + sess = create_session() + c1 = Company(name='MegaCorp, Inc.', + employees=[Engineer(name='dilbert', + primary_language='java'), Engineer(name='wally', + primary_language='c++'), Manager(name='dogbert', + golf_swing='fore!')]) + c2 = Company(name='Elbonia, Inc.', + employees=[Engineer(name='vlad', + primary_language='cobol')]) + sess.add(c1) + sess.add(c2) + sess.flush() + sess.expunge_all() + eq_(sess.query(Company).filter(Company.employees.of_type(Engineer). + any(Engineer.primary_language + == 'cobol')).first(), c2) + + # ensure that the Manager mapper was compiled with the Manager id + # column as higher priority. this ensures that "Manager.id" + # is appropriately treated as the "id" column in the "manager" + # table (reversed from 0.6's behavior.) + + assert Manager.id.property.columns == [Manager.__table__.c.id, Person.__table__.c.id] + + # assert that the "id" column is available without a second + # load. as of 0.7, the ColumnProperty tests all columns + # in it's list to see which is present in the row. + + sess.expunge_all() + + def go(): + assert sess.query(Manager).filter(Manager.name == 'dogbert' + ).one().id + self.assert_sql_count(testing.db, go, 1) + sess.expunge_all() + + def go(): + assert sess.query(Person).filter(Manager.name == 'dogbert' + ).one().id + + self.assert_sql_count(testing.db, go, 1) + + def test_add_subcol_after_the_fact(self): + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column('name', String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Engineer(Person): + + __tablename__ = 'engineers' + __mapper_args__ = {'polymorphic_identity': 'engineer'} + id = Column('id', Integer, ForeignKey('people.id'), + primary_key=True) + + Engineer.primary_language = Column('primary_language', + String(50)) + Base.metadata.create_all() + sess = create_session() + e1 = Engineer(primary_language='java', name='dilbert') + sess.add(e1) + sess.flush() + sess.expunge_all() + eq_(sess.query(Person).first(), Engineer(primary_language='java' + , name='dilbert')) + + def test_add_parentcol_after_the_fact(self): + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Engineer(Person): + + __tablename__ = 'engineers' + __mapper_args__ = {'polymorphic_identity': 'engineer'} + primary_language = Column(String(50)) + id = Column('id', Integer, ForeignKey('people.id'), + primary_key=True) + + Person.name = Column('name', String(50)) + Base.metadata.create_all() + sess = create_session() + e1 = Engineer(primary_language='java', name='dilbert') + sess.add(e1) + sess.flush() + sess.expunge_all() + eq_(sess.query(Person).first(), + Engineer(primary_language='java', name='dilbert')) + + def test_add_sub_parentcol_after_the_fact(self): + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Engineer(Person): + + __tablename__ = 'engineers' + __mapper_args__ = {'polymorphic_identity': 'engineer'} + primary_language = Column(String(50)) + id = Column('id', Integer, ForeignKey('people.id'), + primary_key=True) + + class Admin(Engineer): + + __tablename__ = 'admins' + __mapper_args__ = {'polymorphic_identity': 'admin'} + workstation = Column(String(50)) + id = Column('id', Integer, ForeignKey('engineers.id'), + primary_key=True) + + Person.name = Column('name', String(50)) + Base.metadata.create_all() + sess = create_session() + e1 = Admin(primary_language='java', name='dilbert', + workstation='foo') + sess.add(e1) + sess.flush() + sess.expunge_all() + eq_(sess.query(Person).first(), Admin(primary_language='java', + name='dilbert', workstation='foo')) + + def test_subclass_mixin(self): + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column('id', Integer, primary_key=True) + name = Column('name', String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class MyMixin(object): + + pass + + class Engineer(MyMixin, Person): + + __tablename__ = 'engineers' + __mapper_args__ = {'polymorphic_identity': 'engineer'} + id = Column('id', Integer, ForeignKey('people.id'), + primary_key=True) + primary_language = Column('primary_language', String(50)) + + assert class_mapper(Engineer).inherits is class_mapper(Person) + + @testing.fails_if(lambda: True, "Not implemented until 0.7") + def test_foreign_keys_with_col(self): + """Test that foreign keys that reference a literal 'id' subclass + 'id' attribute behave intuitively. + + See [ticket:1892]. + + """ + + class Booking(Base): + __tablename__ = 'booking' + id = Column(Integer, primary_key=True) + + class PlanBooking(Booking): + __tablename__ = 'plan_booking' + id = Column(Integer, ForeignKey(Booking.id), + primary_key=True) + + # referencing PlanBooking.id gives us the column + # on plan_booking, not booking + class FeatureBooking(Booking): + __tablename__ = 'feature_booking' + id = Column(Integer, ForeignKey(Booking.id), + primary_key=True) + plan_booking_id = Column(Integer, + ForeignKey(PlanBooking.id)) + + plan_booking = relationship(PlanBooking, + backref='feature_bookings') + + assert FeatureBooking.__table__.c.plan_booking_id.\ + references(PlanBooking.__table__.c.id) + + assert FeatureBooking.__table__.c.id.\ + references(Booking.__table__.c.id) + + def test_with_undefined_foreignkey(self): + + class Parent(Base): + + __tablename__ = 'parent' + id = Column('id', Integer, primary_key=True) + tp = Column('type', String(50)) + __mapper_args__ = dict(polymorphic_on=tp) + + class Child1(Parent): + + __tablename__ = 'child1' + id = Column('id', Integer, ForeignKey('parent.id'), + primary_key=True) + related_child2 = Column('c2', Integer, + ForeignKey('child2.id')) + __mapper_args__ = dict(polymorphic_identity='child1') + + # no exception is raised by the ForeignKey to "child2" even + # though child2 doesn't exist yet + + class Child2(Parent): + + __tablename__ = 'child2' + id = Column('id', Integer, ForeignKey('parent.id'), + primary_key=True) + related_child1 = Column('c1', Integer) + __mapper_args__ = dict(polymorphic_identity='child2') + + sa.orm.configure_mappers() # no exceptions here + + def test_foreign_keys_with_col(self): + """Test that foreign keys that reference a literal 'id' subclass + 'id' attribute behave intuitively. + + See [ticket:1892]. + + """ + + class Booking(Base): + __tablename__ = 'booking' + id = Column(Integer, primary_key=True) + + class PlanBooking(Booking): + __tablename__ = 'plan_booking' + id = Column(Integer, ForeignKey(Booking.id), + primary_key=True) + + # referencing PlanBooking.id gives us the column + # on plan_booking, not booking + class FeatureBooking(Booking): + __tablename__ = 'feature_booking' + id = Column(Integer, ForeignKey(Booking.id), + primary_key=True) + plan_booking_id = Column(Integer, + ForeignKey(PlanBooking.id)) + + plan_booking = relationship(PlanBooking, + backref='feature_bookings') + + assert FeatureBooking.__table__.c.plan_booking_id.\ + references(PlanBooking.__table__.c.id) + + assert FeatureBooking.__table__.c.id.\ + references(Booking.__table__.c.id) + + + def test_single_colsonbase(self): + """test single inheritance where all the columns are on the base + class.""" + + class Company(Base, fixtures.ComparableEntity): + + __tablename__ = 'companies' + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column('name', String(50)) + employees = relationship('Person') + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + company_id = Column('company_id', Integer, + ForeignKey('companies.id')) + name = Column('name', String(50)) + discriminator = Column('type', String(50)) + primary_language = Column('primary_language', String(50)) + golf_swing = Column('golf_swing', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Engineer(Person): + + __mapper_args__ = {'polymorphic_identity': 'engineer'} + + class Manager(Person): + + __mapper_args__ = {'polymorphic_identity': 'manager'} + + Base.metadata.create_all() + sess = create_session() + c1 = Company(name='MegaCorp, Inc.', + employees=[Engineer(name='dilbert', + primary_language='java'), Engineer(name='wally', + primary_language='c++'), Manager(name='dogbert', + golf_swing='fore!')]) + c2 = Company(name='Elbonia, Inc.', + employees=[Engineer(name='vlad', + primary_language='cobol')]) + sess.add(c1) + sess.add(c2) + sess.flush() + sess.expunge_all() + eq_(sess.query(Person).filter(Engineer.primary_language + == 'cobol').first(), Engineer(name='vlad')) + eq_(sess.query(Company).filter(Company.employees.of_type(Engineer). + any(Engineer.primary_language + == 'cobol')).first(), c2) + + def test_single_colsonsub(self): + """test single inheritance where the columns are local to their + class. + + this is a newer usage. + + """ + + class Company(Base, fixtures.ComparableEntity): + + __tablename__ = 'companies' + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column('name', String(50)) + employees = relationship('Person') + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + company_id = Column(Integer, ForeignKey('companies.id')) + name = Column(String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Engineer(Person): + + __mapper_args__ = {'polymorphic_identity': 'engineer'} + primary_language = Column(String(50)) + + class Manager(Person): + + __mapper_args__ = {'polymorphic_identity': 'manager'} + golf_swing = Column(String(50)) + + # we have here a situation that is somewhat unique. the Person + # class is mapped to the "people" table, but it was mapped when + # the table did not include the "primary_language" or + # "golf_swing" columns. declarative will also manipulate the + # exclude_properties collection so that sibling classes don't + # cross-pollinate. + + assert Person.__table__.c.company_id is not None + assert Person.__table__.c.golf_swing is not None + assert Person.__table__.c.primary_language is not None + assert Engineer.primary_language is not None + assert Manager.golf_swing is not None + assert not hasattr(Person, 'primary_language') + assert not hasattr(Person, 'golf_swing') + assert not hasattr(Engineer, 'golf_swing') + assert not hasattr(Manager, 'primary_language') + Base.metadata.create_all() + sess = create_session() + e1 = Engineer(name='dilbert', primary_language='java') + e2 = Engineer(name='wally', primary_language='c++') + m1 = Manager(name='dogbert', golf_swing='fore!') + c1 = Company(name='MegaCorp, Inc.', employees=[e1, e2, m1]) + e3 = Engineer(name='vlad', primary_language='cobol') + c2 = Company(name='Elbonia, Inc.', employees=[e3]) + sess.add(c1) + sess.add(c2) + sess.flush() + sess.expunge_all() + eq_(sess.query(Person).filter(Engineer.primary_language + == 'cobol').first(), Engineer(name='vlad')) + eq_(sess.query(Company).filter(Company.employees.of_type(Engineer). + any(Engineer.primary_language + == 'cobol')).first(), c2) + eq_(sess.query(Engineer).filter_by(primary_language='cobol' + ).one(), Engineer(name='vlad', primary_language='cobol')) + + def test_joined_from_single(self): + + class Company(Base, fixtures.ComparableEntity): + + __tablename__ = 'companies' + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column('name', String(50)) + employees = relationship('Person') + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + company_id = Column(Integer, ForeignKey('companies.id')) + name = Column(String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Manager(Person): + + __mapper_args__ = {'polymorphic_identity': 'manager'} + golf_swing = Column(String(50)) + + class Engineer(Person): + + __tablename__ = 'engineers' + __mapper_args__ = {'polymorphic_identity': 'engineer'} + id = Column(Integer, ForeignKey('people.id'), + primary_key=True) + primary_language = Column(String(50)) + + assert Person.__table__.c.golf_swing is not None + assert not Person.__table__.c.has_key('primary_language') + assert Engineer.__table__.c.primary_language is not None + assert Engineer.primary_language is not None + assert Manager.golf_swing is not None + assert not hasattr(Person, 'primary_language') + assert not hasattr(Person, 'golf_swing') + assert not hasattr(Engineer, 'golf_swing') + assert not hasattr(Manager, 'primary_language') + Base.metadata.create_all() + sess = create_session() + e1 = Engineer(name='dilbert', primary_language='java') + e2 = Engineer(name='wally', primary_language='c++') + m1 = Manager(name='dogbert', golf_swing='fore!') + c1 = Company(name='MegaCorp, Inc.', employees=[e1, e2, m1]) + e3 = Engineer(name='vlad', primary_language='cobol') + c2 = Company(name='Elbonia, Inc.', employees=[e3]) + sess.add(c1) + sess.add(c2) + sess.flush() + sess.expunge_all() + eq_(sess.query(Person).with_polymorphic(Engineer). + filter(Engineer.primary_language + == 'cobol').first(), Engineer(name='vlad')) + eq_(sess.query(Company).filter(Company.employees.of_type(Engineer). + any(Engineer.primary_language + == 'cobol')).first(), c2) + eq_(sess.query(Engineer).filter_by(primary_language='cobol' + ).one(), Engineer(name='vlad', primary_language='cobol')) + + def test_polymorphic_on_converted_from_inst(self): + class A(Base): + __tablename__ = 'A' + id = Column(Integer, primary_key=True) + discriminator = Column(String) + + @declared_attr + def __mapper_args__(cls): + return { + 'polymorphic_identity': cls.__name__, + 'polymorphic_on': cls.discriminator + } + + class B(A): + pass + is_(B.__mapper__.polymorphic_on, A.__table__.c.discriminator) + + def test_add_deferred(self): + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + + Person.name = deferred(Column(String(10))) + Base.metadata.create_all() + sess = create_session() + p = Person(name='ratbert') + sess.add(p) + sess.flush() + sess.expunge_all() + eq_(sess.query(Person).all(), [Person(name='ratbert')]) + sess.expunge_all() + person = sess.query(Person).filter(Person.name == 'ratbert' + ).one() + assert 'name' not in person.__dict__ + + def test_single_fksonsub(self): + """test single inheritance with a foreign key-holding column on + a subclass. + + """ + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Engineer(Person): + + __mapper_args__ = {'polymorphic_identity': 'engineer'} + primary_language_id = Column(Integer, + ForeignKey('languages.id')) + primary_language = relationship('Language') + + class Language(Base, fixtures.ComparableEntity): + + __tablename__ = 'languages' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + + assert not hasattr(Person, 'primary_language_id') + Base.metadata.create_all() + sess = create_session() + java, cpp, cobol = Language(name='java'), Language(name='cpp'), \ + Language(name='cobol') + e1 = Engineer(name='dilbert', primary_language=java) + e2 = Engineer(name='wally', primary_language=cpp) + e3 = Engineer(name='vlad', primary_language=cobol) + sess.add_all([e1, e2, e3]) + sess.flush() + sess.expunge_all() + eq_(sess.query(Person).filter(Engineer.primary_language.has( + Language.name + == 'cobol')).first(), Engineer(name='vlad', + primary_language=Language(name='cobol'))) + eq_(sess.query(Engineer).filter(Engineer.primary_language.has( + Language.name + == 'cobol')).one(), Engineer(name='vlad', + primary_language=Language(name='cobol'))) + eq_(sess.query(Person).join(Engineer.primary_language).order_by( + Language.name).all(), + [Engineer(name='vlad', + primary_language=Language(name='cobol')), + Engineer(name='wally', primary_language=Language(name='cpp' + )), Engineer(name='dilbert', + primary_language=Language(name='java'))]) + + def test_single_three_levels(self): + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column(Integer, primary_key=True) + name = Column(String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Engineer(Person): + + __mapper_args__ = {'polymorphic_identity': 'engineer'} + primary_language = Column(String(50)) + + class JuniorEngineer(Engineer): + + __mapper_args__ = \ + {'polymorphic_identity': 'junior_engineer'} + nerf_gun = Column(String(50)) + + class Manager(Person): + + __mapper_args__ = {'polymorphic_identity': 'manager'} + golf_swing = Column(String(50)) + + assert JuniorEngineer.nerf_gun + assert JuniorEngineer.primary_language + assert JuniorEngineer.name + assert Manager.golf_swing + assert Engineer.primary_language + assert not hasattr(Engineer, 'golf_swing') + assert not hasattr(Engineer, 'nerf_gun') + assert not hasattr(Manager, 'nerf_gun') + assert not hasattr(Manager, 'primary_language') + + def test_single_detects_conflict(self): + + class Person(Base): + + __tablename__ = 'people' + id = Column(Integer, primary_key=True) + name = Column(String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + class Engineer(Person): + + __mapper_args__ = {'polymorphic_identity': 'engineer'} + primary_language = Column(String(50)) + + # test sibling col conflict + + def go(): + + class Manager(Person): + + __mapper_args__ = {'polymorphic_identity': 'manager'} + golf_swing = Column(String(50)) + primary_language = Column(String(50)) + + assert_raises(sa.exc.ArgumentError, go) + + # test parent col conflict + + def go(): + + class Salesman(Person): + + __mapper_args__ = {'polymorphic_identity': 'manager'} + name = Column(String(50)) + + assert_raises(sa.exc.ArgumentError, go) + + def test_single_no_special_cols(self): + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column('id', Integer, primary_key=True) + name = Column('name', String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + def go(): + + class Engineer(Person): + + __mapper_args__ = {'polymorphic_identity': 'engineer'} + primary_language = Column('primary_language', + String(50)) + foo_bar = Column(Integer, primary_key=True) + + assert_raises_message(sa.exc.ArgumentError, 'place primary key' + , go) + + def test_single_no_table_args(self): + + class Person(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column('id', Integer, primary_key=True) + name = Column('name', String(50)) + discriminator = Column('type', String(50)) + __mapper_args__ = {'polymorphic_on': discriminator} + + def go(): + + class Engineer(Person): + + __mapper_args__ = {'polymorphic_identity': 'engineer'} + primary_language = Column('primary_language', + String(50)) + + # this should be on the Person class, as this is single + # table inheritance, which is why we test that this + # throws an exception! + + __table_args__ = {'mysql_engine': 'InnoDB'} + + assert_raises_message(sa.exc.ArgumentError, + 'place __table_args__', go) + + @testing.emits_warning("The classname") + def test_dupe_name_in_hierarchy(self): + class A(Base): + __tablename__ = "a" + id = Column( Integer, primary_key=True) + a_1 = A + class A(a_1): + __tablename__ = 'b' + + id = Column(Integer(),ForeignKey(a_1.id), primary_key = True) + + assert A.__mapper__.inherits is a_1.__mapper__ + +from test.orm.test_events import _RemoveListeners +class ConcreteInhTest(_RemoveListeners, DeclarativeTestBase): + def _roundtrip(self, Employee, Manager, Engineer, Boss, polymorphic=True): + Base.metadata.create_all() + sess = create_session() + e1 = Engineer(name='dilbert', primary_language='java') + e2 = Engineer(name='wally', primary_language='c++') + m1 = Manager(name='dogbert', golf_swing='fore!') + e3 = Engineer(name='vlad', primary_language='cobol') + b1 = Boss(name="pointy haired") + sess.add_all([e1, e2, m1, e3, b1]) + sess.flush() + sess.expunge_all() + if polymorphic: + eq_(sess.query(Employee).order_by(Employee.name).all(), + [Engineer(name='dilbert'), Manager(name='dogbert'), + Boss(name='pointy haired'), Engineer(name='vlad'), Engineer(name='wally')]) + else: + eq_(sess.query(Engineer).order_by(Engineer.name).all(), + [Engineer(name='dilbert'), Engineer(name='vlad'), + Engineer(name='wally')]) + eq_(sess.query(Manager).all(), [Manager(name='dogbert')]) + eq_(sess.query(Boss).all(), [Boss(name='pointy haired')]) + + + def test_explicit(self): + engineers = Table('engineers', Base.metadata, Column('id', + Integer, primary_key=True, + test_needs_autoincrement=True), Column('name' + , String(50)), Column('primary_language', + String(50))) + managers = Table('managers', Base.metadata, + Column('id',Integer, primary_key=True, test_needs_autoincrement=True), + Column('name', String(50)), + Column('golf_swing', String(50)) + ) + boss = Table('boss', Base.metadata, + Column('id',Integer, primary_key=True, test_needs_autoincrement=True), + Column('name', String(50)), + Column('golf_swing', String(50)) + ) + punion = polymorphic_union({ + 'engineer': engineers, + 'manager' : managers, + 'boss': boss}, 'type', 'punion') + + class Employee(Base, fixtures.ComparableEntity): + + __table__ = punion + __mapper_args__ = {'polymorphic_on': punion.c.type} + + class Engineer(Employee): + + __table__ = engineers + __mapper_args__ = {'polymorphic_identity': 'engineer', + 'concrete': True} + + class Manager(Employee): + + __table__ = managers + __mapper_args__ = {'polymorphic_identity': 'manager', + 'concrete': True} + + class Boss(Manager): + __table__ = boss + __mapper_args__ = {'polymorphic_identity': 'boss', + 'concrete': True} + + self._roundtrip(Employee, Manager, Engineer, Boss) + + def test_concrete_inline_non_polymorphic(self): + """test the example from the declarative docs.""" + + class Employee(Base, fixtures.ComparableEntity): + + __tablename__ = 'people' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + + class Engineer(Employee): + + __tablename__ = 'engineers' + __mapper_args__ = {'concrete': True} + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + primary_language = Column(String(50)) + name = Column(String(50)) + + class Manager(Employee): + + __tablename__ = 'manager' + __mapper_args__ = {'concrete': True} + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + golf_swing = Column(String(50)) + name = Column(String(50)) + + class Boss(Manager): + __tablename__ = 'boss' + __mapper_args__ = {'concrete': True} + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + golf_swing = Column(String(50)) + name = Column(String(50)) + + self._roundtrip(Employee, Manager, Engineer, Boss, polymorphic=False) + + def test_abstract_concrete_extension(self): + class Employee(AbstractConcreteBase, Base, fixtures.ComparableEntity): + pass + + class Manager(Employee): + __tablename__ = 'manager' + employee_id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + golf_swing = Column(String(40)) + __mapper_args__ = { + 'polymorphic_identity':'manager', + 'concrete':True} + + class Boss(Manager): + __tablename__ = 'boss' + employee_id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + golf_swing = Column(String(40)) + __mapper_args__ = { + 'polymorphic_identity':'boss', + 'concrete':True} + + class Engineer(Employee): + __tablename__ = 'engineer' + employee_id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + primary_language = Column(String(40)) + __mapper_args__ = {'polymorphic_identity':'engineer', + 'concrete':True} + + self._roundtrip(Employee, Manager, Engineer, Boss) + + def test_concrete_extension(self): + class Employee(ConcreteBase, Base, fixtures.ComparableEntity): + __tablename__ = 'employee' + employee_id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + __mapper_args__ = { + 'polymorphic_identity':'employee', + 'concrete':True} + class Manager(Employee): + __tablename__ = 'manager' + employee_id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + golf_swing = Column(String(40)) + __mapper_args__ = { + 'polymorphic_identity':'manager', + 'concrete':True} + + class Boss(Manager): + __tablename__ = 'boss' + employee_id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + golf_swing = Column(String(40)) + __mapper_args__ = { + 'polymorphic_identity':'boss', + 'concrete':True} + + class Engineer(Employee): + __tablename__ = 'engineer' + employee_id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + name = Column(String(50)) + primary_language = Column(String(40)) + __mapper_args__ = {'polymorphic_identity':'engineer', + 'concrete':True} + self._roundtrip(Employee, Manager, Engineer, Boss) diff --git a/test/ext/test_declarative_mixin.py b/test/ext/test_declarative_mixin.py new file mode 100644 index 0000000000..9de56134f8 --- /dev/null +++ b/test/ext/test_declarative_mixin.py @@ -0,0 +1,1082 @@ +from test.lib.testing import eq_, assert_raises, \ + assert_raises_message +from sqlalchemy.ext import declarative as decl +import sqlalchemy as sa +from test.lib import testing +from sqlalchemy import Integer, String, ForeignKey +from test.lib.schema import Table, Column +from sqlalchemy.orm import relationship, create_session, class_mapper, \ + configure_mappers, clear_mappers, \ + deferred, column_property, \ + Session +from sqlalchemy.util import classproperty +from sqlalchemy.ext.declarative import declared_attr +from test.lib import fixtures + +class DeclarativeTestBase(fixtures.TestBase, testing.AssertsExecutionResults): + def setup(self): + global Base + Base = decl.declarative_base(testing.db) + + def teardown(self): + Session.close_all() + clear_mappers() + Base.metadata.drop_all() + +class DeclarativeMixinTest(DeclarativeTestBase): + + def test_simple(self): + + class MyMixin(object): + + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + + def foo(self): + return 'bar' + str(self.id) + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + name = Column(String(100), nullable=False, index=True) + + Base.metadata.create_all() + session = create_session() + session.add(MyModel(name='testing')) + session.flush() + session.expunge_all() + obj = session.query(MyModel).one() + eq_(obj.id, 1) + eq_(obj.name, 'testing') + eq_(obj.foo(), 'bar1') + + def test_unique_column(self): + + class MyMixin(object): + + id = Column(Integer, primary_key=True) + value = Column(String, unique=True) + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + + assert MyModel.__table__.c.value.unique + + def test_hierarchical_bases(self): + + class MyMixinParent: + + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + + def foo(self): + return 'bar' + str(self.id) + + class MyMixin(MyMixinParent): + + baz = Column(String(100), nullable=False, index=True) + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + name = Column(String(100), nullable=False, index=True) + + Base.metadata.create_all() + session = create_session() + session.add(MyModel(name='testing', baz='fu')) + session.flush() + session.expunge_all() + obj = session.query(MyModel).one() + eq_(obj.id, 1) + eq_(obj.name, 'testing') + eq_(obj.foo(), 'bar1') + eq_(obj.baz, 'fu') + + def test_mixin_overrides(self): + """test a mixin that overrides a column on a superclass.""" + + class MixinA(object): + foo = Column(String(50)) + + class MixinB(MixinA): + foo = Column(Integer) + + class MyModelA(Base, MixinA): + __tablename__ = 'testa' + id = Column(Integer, primary_key=True) + + class MyModelB(Base, MixinB): + __tablename__ = 'testb' + id = Column(Integer, primary_key=True) + + eq_(MyModelA.__table__.c.foo.type.__class__, String) + eq_(MyModelB.__table__.c.foo.type.__class__, Integer) + + + def test_not_allowed(self): + + class MyMixin: + foo = Column(Integer, ForeignKey('bar.id')) + + def go(): + class MyModel(Base, MyMixin): + __tablename__ = 'foo' + + assert_raises(sa.exc.InvalidRequestError, go) + + class MyRelMixin: + foo = relationship('Bar') + + def go(): + class MyModel(Base, MyRelMixin): + + __tablename__ = 'foo' + + assert_raises(sa.exc.InvalidRequestError, go) + + class MyDefMixin: + foo = deferred(Column('foo', String)) + + def go(): + class MyModel(Base, MyDefMixin): + __tablename__ = 'foo' + + assert_raises(sa.exc.InvalidRequestError, go) + + class MyCPropMixin: + foo = column_property(Column('foo', String)) + + def go(): + class MyModel(Base, MyCPropMixin): + __tablename__ = 'foo' + + assert_raises(sa.exc.InvalidRequestError, go) + + def test_table_name_inherited(self): + + class MyMixin: + @declared_attr + def __tablename__(cls): + return cls.__name__.lower() + id = Column(Integer, primary_key=True) + + class MyModel(Base, MyMixin): + pass + + eq_(MyModel.__table__.name, 'mymodel') + + def test_classproperty_still_works(self): + class MyMixin(object): + @classproperty + def __tablename__(cls): + return cls.__name__.lower() + id = Column(Integer, primary_key=True) + + class MyModel(Base, MyMixin): + __tablename__ = 'overridden' + + eq_(MyModel.__table__.name, 'overridden') + + def test_table_name_not_inherited(self): + + class MyMixin: + @declared_attr + def __tablename__(cls): + return cls.__name__.lower() + id = Column(Integer, primary_key=True) + + class MyModel(Base, MyMixin): + __tablename__ = 'overridden' + + eq_(MyModel.__table__.name, 'overridden') + + def test_table_name_inheritance_order(self): + + class MyMixin1: + @declared_attr + def __tablename__(cls): + return cls.__name__.lower() + '1' + + class MyMixin2: + @declared_attr + def __tablename__(cls): + return cls.__name__.lower() + '2' + + class MyModel(Base, MyMixin1, MyMixin2): + id = Column(Integer, primary_key=True) + + eq_(MyModel.__table__.name, 'mymodel1') + + def test_table_name_dependent_on_subclass(self): + + class MyHistoryMixin: + @declared_attr + def __tablename__(cls): + return cls.parent_name + '_changelog' + + class MyModel(Base, MyHistoryMixin): + parent_name = 'foo' + id = Column(Integer, primary_key=True) + + eq_(MyModel.__table__.name, 'foo_changelog') + + def test_table_args_inherited(self): + + class MyMixin: + __table_args__ = {'mysql_engine': 'InnoDB'} + + class MyModel(Base, MyMixin): + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + + eq_(MyModel.__table__.kwargs, {'mysql_engine': 'InnoDB'}) + + def test_table_args_inherited_descriptor(self): + + class MyMixin: + @declared_attr + def __table_args__(cls): + return {'info': cls.__name__} + + class MyModel(Base, MyMixin): + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + + eq_(MyModel.__table__.info, 'MyModel') + + def test_table_args_inherited_single_table_inheritance(self): + + class MyMixin: + __table_args__ = {'mysql_engine': 'InnoDB'} + + class General(Base, MyMixin): + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + type_ = Column(String(50)) + __mapper__args = {'polymorphic_on': type_} + + class Specific(General): + __mapper_args__ = {'polymorphic_identity': 'specific'} + + assert Specific.__table__ is General.__table__ + eq_(General.__table__.kwargs, {'mysql_engine': 'InnoDB'}) + + def test_columns_single_table_inheritance(self): + """Test a column on a mixin with an alternate attribute name, + mapped to a superclass and single-table inheritance subclass. + The superclass table gets the column, the subclass shares + the MapperProperty. + + """ + + class MyMixin(object): + foo = Column('foo', Integer) + bar = Column('bar_newname', Integer) + + class General(Base, MyMixin): + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + type_ = Column(String(50)) + __mapper__args = {'polymorphic_on': type_} + + class Specific(General): + __mapper_args__ = {'polymorphic_identity': 'specific'} + + assert General.bar.prop.columns[0] is General.__table__.c.bar_newname + assert len(General.bar.prop.columns) == 1 + assert Specific.bar.prop is General.bar.prop + + def test_columns_joined_table_inheritance(self): + """Test a column on a mixin with an alternate attribute name, + mapped to a superclass and joined-table inheritance subclass. + Both tables get the column, in the case of the subclass the two + columns are joined under one MapperProperty. + + """ + + class MyMixin(object): + foo = Column('foo', Integer) + bar = Column('bar_newname', Integer) + + class General(Base, MyMixin): + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + type_ = Column(String(50)) + __mapper__args = {'polymorphic_on': type_} + + class Specific(General): + __tablename__ = 'sub' + id = Column(Integer, ForeignKey('test.id'), primary_key=True) + __mapper_args__ = {'polymorphic_identity': 'specific'} + + assert General.bar.prop.columns[0] is General.__table__.c.bar_newname + assert len(General.bar.prop.columns) == 1 + assert Specific.bar.prop is not General.bar.prop + assert len(Specific.bar.prop.columns) == 2 + assert Specific.bar.prop.columns[0] is General.__table__.c.bar_newname + assert Specific.bar.prop.columns[1] is Specific.__table__.c.bar_newname + + def test_column_join_checks_superclass_type(self): + """Test that the logic which joins subclass props to those + of the superclass checks that the superclass property is a column. + + """ + + class General(Base): + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + general_id = Column(Integer, ForeignKey('test.id')) + type_ = relationship("General") + + class Specific(General): + __tablename__ = 'sub' + id = Column(Integer, ForeignKey('test.id'), primary_key=True) + type_ = Column('foob', String(50)) + + assert isinstance(General.type_.property, sa.orm.RelationshipProperty) + assert Specific.type_.property.columns[0] is Specific.__table__.c.foob + + def test_column_join_checks_subclass_type(self): + """Test that the logic which joins subclass props to those + of the superclass checks that the subclass property is a column. + + """ + + def go(): + class General(Base): + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + type_ = Column('foob', Integer) + + class Specific(General): + __tablename__ = 'sub' + id = Column(Integer, ForeignKey('test.id'), primary_key=True) + specific_id = Column(Integer, ForeignKey('sub.id')) + type_ = relationship("Specific") + assert_raises_message( + sa.exc.ArgumentError, "column 'foob' conflicts with property", go + ) + + def test_table_args_overridden(self): + + class MyMixin: + __table_args__ = {'mysql_engine': 'Foo'} + + class MyModel(Base, MyMixin): + __tablename__ = 'test' + __table_args__ = {'mysql_engine': 'InnoDB'} + id = Column(Integer, primary_key=True) + + eq_(MyModel.__table__.kwargs, {'mysql_engine': 'InnoDB'}) + + def test_mapper_args_declared_attr(self): + + class ComputedMapperArgs: + @declared_attr + def __mapper_args__(cls): + if cls.__name__ == 'Person': + return {'polymorphic_on': cls.discriminator} + else: + return {'polymorphic_identity': cls.__name__} + + class Person(Base, ComputedMapperArgs): + __tablename__ = 'people' + id = Column(Integer, primary_key=True) + discriminator = Column('type', String(50)) + + class Engineer(Person): + pass + + configure_mappers() + assert class_mapper(Person).polymorphic_on \ + is Person.__table__.c.type + eq_(class_mapper(Engineer).polymorphic_identity, 'Engineer') + + def test_mapper_args_declared_attr_two(self): + + # same as test_mapper_args_declared_attr, but we repeat + # ComputedMapperArgs on both classes for no apparent reason. + + class ComputedMapperArgs: + @declared_attr + def __mapper_args__(cls): + if cls.__name__ == 'Person': + return {'polymorphic_on': cls.discriminator} + else: + return {'polymorphic_identity': cls.__name__} + + class Person(Base, ComputedMapperArgs): + + __tablename__ = 'people' + id = Column(Integer, primary_key=True) + discriminator = Column('type', String(50)) + + class Engineer(Person, ComputedMapperArgs): + pass + + configure_mappers() + assert class_mapper(Person).polymorphic_on \ + is Person.__table__.c.type + eq_(class_mapper(Engineer).polymorphic_identity, 'Engineer') + + def test_table_args_composite(self): + + class MyMixin1: + + __table_args__ = {'info': {'baz': 'bob'}} + + class MyMixin2: + + __table_args__ = {'info': {'foo': 'bar'}} + + class MyModel(Base, MyMixin1, MyMixin2): + + __tablename__ = 'test' + + @declared_attr + def __table_args__(self): + info = {} + args = dict(info=info) + info.update(MyMixin1.__table_args__['info']) + info.update(MyMixin2.__table_args__['info']) + return args + id = Column(Integer, primary_key=True) + + eq_(MyModel.__table__.info, {'foo': 'bar', 'baz': 'bob'}) + + def test_mapper_args_inherited(self): + + class MyMixin: + + __mapper_args__ = {'always_refresh': True} + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + + eq_(MyModel.__mapper__.always_refresh, True) + + def test_mapper_args_inherited_descriptor(self): + + class MyMixin: + + @declared_attr + def __mapper_args__(cls): + + # tenuous, but illustrates the problem! + + if cls.__name__ == 'MyModel': + return dict(always_refresh=True) + else: + return dict(always_refresh=False) + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + + eq_(MyModel.__mapper__.always_refresh, True) + + def test_mapper_args_polymorphic_on_inherited(self): + + class MyMixin: + + type_ = Column(String(50)) + __mapper_args__ = {'polymorphic_on': type_} + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + + col = MyModel.__mapper__.polymorphic_on + eq_(col.name, 'type_') + assert col.table is not None + + def test_mapper_args_overridden(self): + + class MyMixin: + + __mapper_args__ = dict(always_refresh=True) + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + __mapper_args__ = dict(always_refresh=False) + id = Column(Integer, primary_key=True) + + eq_(MyModel.__mapper__.always_refresh, False) + + def test_mapper_args_composite(self): + + class MyMixin1: + + type_ = Column(String(50)) + __mapper_args__ = {'polymorphic_on': type_} + + class MyMixin2: + + __mapper_args__ = {'always_refresh': True} + + class MyModel(Base, MyMixin1, MyMixin2): + + __tablename__ = 'test' + + @declared_attr + def __mapper_args__(cls): + args = {} + args.update(MyMixin1.__mapper_args__) + args.update(MyMixin2.__mapper_args__) + if cls.__name__ != 'MyModel': + args.pop('polymorphic_on') + args['polymorphic_identity'] = cls.__name__ + + return args + id = Column(Integer, primary_key=True) + + class MySubModel(MyModel): + pass + + eq_( + MyModel.__mapper__.polymorphic_on.name, + 'type_' + ) + assert MyModel.__mapper__.polymorphic_on.table is not None + eq_(MyModel.__mapper__.always_refresh, True) + eq_(MySubModel.__mapper__.always_refresh, True) + eq_(MySubModel.__mapper__.polymorphic_identity, 'MySubModel') + + def test_mapper_args_property(self): + class MyModel(Base): + + @declared_attr + def __tablename__(cls): + return cls.__name__.lower() + + @declared_attr + def __table_args__(cls): + return {'mysql_engine':'InnoDB'} + + @declared_attr + def __mapper_args__(cls): + args = {} + args['polymorphic_identity'] = cls.__name__ + return args + id = Column(Integer, primary_key=True) + + class MySubModel(MyModel): + id = Column(Integer, ForeignKey('mymodel.id'), primary_key=True) + + class MySubModel2(MyModel): + __tablename__ = 'sometable' + id = Column(Integer, ForeignKey('mymodel.id'), primary_key=True) + + eq_(MyModel.__mapper__.polymorphic_identity, 'MyModel') + eq_(MySubModel.__mapper__.polymorphic_identity, 'MySubModel') + eq_(MyModel.__table__.kwargs['mysql_engine'], 'InnoDB') + eq_(MySubModel.__table__.kwargs['mysql_engine'], 'InnoDB') + eq_(MySubModel2.__table__.kwargs['mysql_engine'], 'InnoDB') + eq_(MyModel.__table__.name, 'mymodel') + eq_(MySubModel.__table__.name, 'mysubmodel') + + def test_mapper_args_custom_base(self): + """test the @declared_attr approach from a custom base.""" + + class Base(object): + @declared_attr + def __tablename__(cls): + return cls.__name__.lower() + + @declared_attr + def __table_args__(cls): + return {'mysql_engine':'InnoDB'} + + @declared_attr + def id(self): + return Column(Integer, primary_key=True) + + Base = decl.declarative_base(cls=Base) + + class MyClass(Base): + pass + + class MyOtherClass(Base): + pass + + eq_(MyClass.__table__.kwargs['mysql_engine'], 'InnoDB') + eq_(MyClass.__table__.name, 'myclass') + eq_(MyOtherClass.__table__.name, 'myotherclass') + assert MyClass.__table__.c.id.table is MyClass.__table__ + assert MyOtherClass.__table__.c.id.table is MyOtherClass.__table__ + + def test_single_table_no_propagation(self): + + class IdColumn: + + id = Column(Integer, primary_key=True) + + class Generic(Base, IdColumn): + + __tablename__ = 'base' + discriminator = Column('type', String(50)) + __mapper_args__ = dict(polymorphic_on=discriminator) + value = Column(Integer()) + + class Specific(Generic): + + __mapper_args__ = dict(polymorphic_identity='specific') + + assert Specific.__table__ is Generic.__table__ + eq_(Generic.__table__.c.keys(), ['id', 'type', 'value']) + assert class_mapper(Specific).polymorphic_on \ + is Generic.__table__.c.type + eq_(class_mapper(Specific).polymorphic_identity, 'specific') + + def test_joined_table_propagation(self): + + class CommonMixin: + + @declared_attr + def __tablename__(cls): + return cls.__name__.lower() + __table_args__ = {'mysql_engine': 'InnoDB'} + timestamp = Column(Integer) + id = Column(Integer, primary_key=True) + + class Generic(Base, CommonMixin): + + discriminator = Column('python_type', String(50)) + __mapper_args__ = dict(polymorphic_on=discriminator) + + class Specific(Generic): + + __mapper_args__ = dict(polymorphic_identity='specific') + id = Column(Integer, ForeignKey('generic.id'), + primary_key=True) + + eq_(Generic.__table__.name, 'generic') + eq_(Specific.__table__.name, 'specific') + eq_(Generic.__table__.c.keys(), ['timestamp', 'id', + 'python_type']) + eq_(Specific.__table__.c.keys(), ['timestamp', 'id']) + eq_(Generic.__table__.kwargs, {'mysql_engine': 'InnoDB'}) + eq_(Specific.__table__.kwargs, {'mysql_engine': 'InnoDB'}) + + def test_some_propagation(self): + + class CommonMixin: + + @declared_attr + def __tablename__(cls): + return cls.__name__.lower() + __table_args__ = {'mysql_engine': 'InnoDB'} + timestamp = Column(Integer) + + class BaseType(Base, CommonMixin): + + discriminator = Column('type', String(50)) + __mapper_args__ = dict(polymorphic_on=discriminator) + id = Column(Integer, primary_key=True) + value = Column(Integer()) + + class Single(BaseType): + + __tablename__ = None + __mapper_args__ = dict(polymorphic_identity='type1') + + class Joined(BaseType): + + __mapper_args__ = dict(polymorphic_identity='type2') + id = Column(Integer, ForeignKey('basetype.id'), + primary_key=True) + + eq_(BaseType.__table__.name, 'basetype') + eq_(BaseType.__table__.c.keys(), ['timestamp', 'type', 'id', + 'value']) + eq_(BaseType.__table__.kwargs, {'mysql_engine': 'InnoDB'}) + assert Single.__table__ is BaseType.__table__ + eq_(Joined.__table__.name, 'joined') + eq_(Joined.__table__.c.keys(), ['timestamp', 'id']) + eq_(Joined.__table__.kwargs, {'mysql_engine': 'InnoDB'}) + + def test_non_propagating_mixin(self): + + class NoJoinedTableNameMixin: + + @declared_attr + def __tablename__(cls): + if decl.has_inherited_table(cls): + return None + return cls.__name__.lower() + + class BaseType(Base, NoJoinedTableNameMixin): + + discriminator = Column('type', String(50)) + __mapper_args__ = dict(polymorphic_on=discriminator) + id = Column(Integer, primary_key=True) + value = Column(Integer()) + + class Specific(BaseType): + + __mapper_args__ = dict(polymorphic_identity='specific') + + eq_(BaseType.__table__.name, 'basetype') + eq_(BaseType.__table__.c.keys(), ['type', 'id', 'value']) + assert Specific.__table__ is BaseType.__table__ + assert class_mapper(Specific).polymorphic_on \ + is BaseType.__table__.c.type + eq_(class_mapper(Specific).polymorphic_identity, 'specific') + + def test_non_propagating_mixin_used_for_joined(self): + + class TableNameMixin: + + @declared_attr + def __tablename__(cls): + if decl.has_inherited_table(cls) and TableNameMixin \ + not in cls.__bases__: + return None + return cls.__name__.lower() + + class BaseType(Base, TableNameMixin): + + discriminator = Column('type', String(50)) + __mapper_args__ = dict(polymorphic_on=discriminator) + id = Column(Integer, primary_key=True) + value = Column(Integer()) + + class Specific(BaseType, TableNameMixin): + + __mapper_args__ = dict(polymorphic_identity='specific') + id = Column(Integer, ForeignKey('basetype.id'), + primary_key=True) + + eq_(BaseType.__table__.name, 'basetype') + eq_(BaseType.__table__.c.keys(), ['type', 'id', 'value']) + eq_(Specific.__table__.name, 'specific') + eq_(Specific.__table__.c.keys(), ['id']) + + def test_single_back_propagate(self): + + class ColumnMixin: + + timestamp = Column(Integer) + + class BaseType(Base): + + __tablename__ = 'foo' + discriminator = Column('type', String(50)) + __mapper_args__ = dict(polymorphic_on=discriminator) + id = Column(Integer, primary_key=True) + + class Specific(BaseType, ColumnMixin): + + __mapper_args__ = dict(polymorphic_identity='specific') + + eq_(BaseType.__table__.c.keys(), ['type', 'id', 'timestamp']) + + def test_table_in_model_and_same_column_in_mixin(self): + + class ColumnMixin: + + data = Column(Integer) + + class Model(Base, ColumnMixin): + + __table__ = Table('foo', Base.metadata, Column('data', + Integer), Column('id', Integer, + primary_key=True)) + + model_col = Model.__table__.c.data + mixin_col = ColumnMixin.data + assert model_col is not mixin_col + eq_(model_col.name, 'data') + assert model_col.type.__class__ is mixin_col.type.__class__ + + def test_table_in_model_and_different_named_column_in_mixin(self): + + class ColumnMixin: + tada = Column(Integer) + + def go(): + + class Model(Base, ColumnMixin): + + __table__ = Table('foo', Base.metadata, + Column('data',Integer), + Column('id', Integer,primary_key=True)) + foo = relationship("Dest") + + assert_raises_message(sa.exc.ArgumentError, + "Can't add additional column 'tada' when " + "specifying __table__", go) + + def test_table_in_model_and_different_named_alt_key_column_in_mixin(self): + + # here, the __table__ has a column 'tada'. We disallow + # the add of the 'foobar' column, even though it's + # keyed to 'tada'. + + class ColumnMixin: + tada = Column('foobar', Integer) + + def go(): + + class Model(Base, ColumnMixin): + + __table__ = Table('foo', Base.metadata, + Column('data',Integer), + Column('tada', Integer), + Column('id', Integer,primary_key=True)) + foo = relationship("Dest") + + assert_raises_message(sa.exc.ArgumentError, + "Can't add additional column 'foobar' when " + "specifying __table__", go) + + def test_table_in_model_overrides_different_typed_column_in_mixin(self): + + class ColumnMixin: + + data = Column(String) + + class Model(Base, ColumnMixin): + + __table__ = Table('foo', Base.metadata, Column('data', + Integer), Column('id', Integer, + primary_key=True)) + + model_col = Model.__table__.c.data + mixin_col = ColumnMixin.data + assert model_col is not mixin_col + eq_(model_col.name, 'data') + assert model_col.type.__class__ is Integer + + def test_mixin_column_ordering(self): + + class Foo(object): + + col1 = Column(Integer) + col3 = Column(Integer) + + class Bar(object): + + col2 = Column(Integer) + col4 = Column(Integer) + + class Model(Base, Foo, Bar): + + id = Column(Integer, primary_key=True) + __tablename__ = 'model' + + eq_(Model.__table__.c.keys(), ['col1', 'col3', 'col2', 'col4', + 'id']) + + def test_honor_class_mro_one(self): + class HasXMixin(object): + @declared_attr + def x(self): + return Column(Integer) + + class Parent(HasXMixin, Base): + __tablename__ = 'parent' + id = Column(Integer, primary_key=True) + + class Child(Parent): + __tablename__ = 'child' + id = Column(Integer, ForeignKey('parent.id'), primary_key=True) + + assert "x" not in Child.__table__.c + + def test_honor_class_mro_two(self): + class HasXMixin(object): + @declared_attr + def x(self): + return Column(Integer) + + class Parent(HasXMixin, Base): + __tablename__ = 'parent' + id = Column(Integer, primary_key=True) + def x(self): + return "hi" + + class C(Parent): + __tablename__ = 'c' + id = Column(Integer, ForeignKey('parent.id'), primary_key=True) + + assert C().x() == 'hi' + + +class DeclarativeMixinPropertyTest(DeclarativeTestBase): + + def test_column_property(self): + + class MyMixin(object): + + @declared_attr + def prop_hoho(cls): + return column_property(Column('prop', String(50))) + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + + class MyOtherModel(Base, MyMixin): + + __tablename__ = 'othertest' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + + assert MyModel.__table__.c.prop is not None + assert MyOtherModel.__table__.c.prop is not None + assert MyModel.__table__.c.prop \ + is not MyOtherModel.__table__.c.prop + assert MyModel.prop_hoho.property.columns \ + == [MyModel.__table__.c.prop] + assert MyOtherModel.prop_hoho.property.columns \ + == [MyOtherModel.__table__.c.prop] + assert MyModel.prop_hoho.property \ + is not MyOtherModel.prop_hoho.property + Base.metadata.create_all() + sess = create_session() + m1, m2 = MyModel(prop_hoho='foo'), MyOtherModel(prop_hoho='bar') + sess.add_all([m1, m2]) + sess.flush() + eq_(sess.query(MyModel).filter(MyModel.prop_hoho == 'foo' + ).one(), m1) + eq_(sess.query(MyOtherModel).filter(MyOtherModel.prop_hoho + == 'bar').one(), m2) + + def test_doc(self): + """test documentation transfer. + + the documentation situation with @declared_attr is problematic. + at least see if mapped subclasses get the doc. + + """ + + class MyMixin(object): + + @declared_attr + def type_(cls): + """this is a document.""" + + return Column(String(50)) + + @declared_attr + def t2(cls): + """this is another document.""" + + return column_property(Column(String(50))) + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + + configure_mappers() + eq_(MyModel.type_.__doc__, """this is a document.""") + eq_(MyModel.t2.__doc__, """this is another document.""") + + def test_column_in_mapper_args(self): + + class MyMixin(object): + + @declared_attr + def type_(cls): + return Column(String(50)) + __mapper_args__ = {'polymorphic_on': type_} + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + id = Column(Integer, primary_key=True) + + configure_mappers() + col = MyModel.__mapper__.polymorphic_on + eq_(col.name, 'type_') + assert col.table is not None + + def test_deferred(self): + + class MyMixin(object): + + @declared_attr + def data(cls): + return deferred(Column('data', String(50))) + + class MyModel(Base, MyMixin): + + __tablename__ = 'test' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + + Base.metadata.create_all() + sess = create_session() + sess.add_all([MyModel(data='d1'), MyModel(data='d2')]) + sess.flush() + sess.expunge_all() + d1, d2 = sess.query(MyModel).order_by(MyModel.data) + assert 'data' not in d1.__dict__ + assert d1.data == 'd1' + assert 'data' in d1.__dict__ + + def _test_relationship(self, usestring): + + class RefTargetMixin(object): + + @declared_attr + def target_id(cls): + return Column('target_id', ForeignKey('target.id')) + if usestring: + + @declared_attr + def target(cls): + return relationship('Target', + primaryjoin='Target.id==%s.target_id' + % cls.__name__) + else: + + @declared_attr + def target(cls): + return relationship('Target') + + class Foo(Base, RefTargetMixin): + + __tablename__ = 'foo' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + + class Bar(Base, RefTargetMixin): + + __tablename__ = 'bar' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + + class Target(Base): + + __tablename__ = 'target' + id = Column(Integer, primary_key=True, + test_needs_autoincrement=True) + + Base.metadata.create_all() + sess = create_session() + t1, t2 = Target(), Target() + f1, f2, b1 = Foo(target=t1), Foo(target=t2), Bar(target=t1) + sess.add_all([f1, f2, b1]) + sess.flush() + eq_(sess.query(Foo).filter(Foo.target == t2).one(), f2) + eq_(sess.query(Bar).filter(Bar.target == t2).first(), None) + sess.expire_all() + eq_(f1.target, t1) + + def test_relationship(self): + self._test_relationship(False) + + def test_relationship_primryjoin(self): + self._test_relationship(True) + diff --git a/test/ext/test_declarative_reflection.py b/test/ext/test_declarative_reflection.py new file mode 100644 index 0000000000..28aa47c927 --- /dev/null +++ b/test/ext/test_declarative_reflection.py @@ -0,0 +1,218 @@ +from test.lib.testing import eq_, assert_raises +from sqlalchemy.ext import declarative as decl +from test.lib import testing +from sqlalchemy import MetaData, Integer, String, ForeignKey +from test.lib.schema import Table, Column +from sqlalchemy.orm import relationship, create_session, \ + clear_mappers, \ + Session +from test.lib import fixtures + +class DeclarativeReflectionBase(fixtures.TablesTest): + def setup(self): + global Base + Base = decl.declarative_base(testing.db) + +class DeclarativeReflectionTest(DeclarativeReflectionBase): + + @classmethod + def define_tables(cls, metadata): + Table('users', metadata, + Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column('name', String(50)), test_needs_fk=True) + Table( + 'addresses', + metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('email', String(50)), + Column('user_id', Integer, ForeignKey('users.id')), + test_needs_fk=True, + ) + Table( + 'imhandles', + metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('user_id', Integer), + Column('network', String(50)), + Column('handle', String(50)), + test_needs_fk=True, + ) + + def test_basic(self): + meta = MetaData(testing.db) + + class User(Base, fixtures.ComparableEntity): + + __tablename__ = 'users' + __autoload__ = True + if testing.against('oracle', 'firebird'): + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + addresses = relationship('Address', backref='user') + + class Address(Base, fixtures.ComparableEntity): + + __tablename__ = 'addresses' + __autoload__ = True + if testing.against('oracle', 'firebird'): + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + + u1 = User(name='u1', addresses=[Address(email='one'), + Address(email='two')]) + sess = create_session() + sess.add(u1) + sess.flush() + sess.expunge_all() + eq_(sess.query(User).all(), [User(name='u1', + addresses=[Address(email='one'), Address(email='two')])]) + a1 = sess.query(Address).filter(Address.email == 'two').one() + eq_(a1, Address(email='two')) + eq_(a1.user, User(name='u1')) + + def test_rekey(self): + meta = MetaData(testing.db) + + class User(Base, fixtures.ComparableEntity): + + __tablename__ = 'users' + __autoload__ = True + if testing.against('oracle', 'firebird'): + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + nom = Column('name', String(50), key='nom') + addresses = relationship('Address', backref='user') + + class Address(Base, fixtures.ComparableEntity): + + __tablename__ = 'addresses' + __autoload__ = True + if testing.against('oracle', 'firebird'): + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + + u1 = User(nom='u1', addresses=[Address(email='one'), + Address(email='two')]) + sess = create_session() + sess.add(u1) + sess.flush() + sess.expunge_all() + eq_(sess.query(User).all(), [User(nom='u1', + addresses=[Address(email='one'), Address(email='two')])]) + a1 = sess.query(Address).filter(Address.email == 'two').one() + eq_(a1, Address(email='two')) + eq_(a1.user, User(nom='u1')) + assert_raises(TypeError, User, name='u3') + + def test_supplied_fk(self): + meta = MetaData(testing.db) + + class IMHandle(Base, fixtures.ComparableEntity): + + __tablename__ = 'imhandles' + __autoload__ = True + if testing.against('oracle', 'firebird'): + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + user_id = Column('user_id', Integer, ForeignKey('users.id')) + + class User(Base, fixtures.ComparableEntity): + + __tablename__ = 'users' + __autoload__ = True + if testing.against('oracle', 'firebird'): + id = Column('id', Integer, primary_key=True, + test_needs_autoincrement=True) + handles = relationship('IMHandle', backref='user') + + u1 = User(name='u1', handles=[IMHandle(network='blabber', + handle='foo'), IMHandle(network='lol', handle='zomg' + )]) + sess = create_session() + sess.add(u1) + sess.flush() + sess.expunge_all() + eq_(sess.query(User).all(), [User(name='u1', + handles=[IMHandle(network='blabber', handle='foo'), + IMHandle(network='lol', handle='zomg')])]) + a1 = sess.query(IMHandle).filter(IMHandle.handle == 'zomg' + ).one() + eq_(a1, IMHandle(network='lol', handle='zomg')) + eq_(a1.user, User(name='u1')) + +class DeferredReflectBase(DeclarativeReflectionBase): + def teardown(self): + super(DeferredReflectBase,self).teardown() + from sqlalchemy.ext.declarative import _MapperConfig + _MapperConfig.configs.clear() + +class DeferredReflectionTest(DeferredReflectBase): + + @classmethod + def define_tables(cls, metadata): + Table('users', metadata, + Column('id', Integer, + primary_key=True, test_needs_autoincrement=True), + Column('name', String(50)), test_needs_fk=True) + Table( + 'addresses', + metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('email', String(50)), + Column('user_id', Integer, ForeignKey('users.id')), + test_needs_fk=True, + ) + + def _roundtrip(self): + + User = Base._decl_class_registry['User'] + Address = Base._decl_class_registry['Address'] + + u1 = User(name='u1', addresses=[Address(email='one'), + Address(email='two')]) + sess = create_session() + sess.add(u1) + sess.flush() + sess.expunge_all() + eq_(sess.query(User).all(), [User(name='u1', + addresses=[Address(email='one'), Address(email='two')])]) + a1 = sess.query(Address).filter(Address.email == 'two').one() + eq_(a1, Address(email='two')) + eq_(a1.user, User(name='u1')) + + def test_basic_deferred(self): + class User(decl.DeferredReflection, fixtures.ComparableEntity, + Base): + __tablename__ = 'users' + addresses = relationship("Address", backref="user") + + class Address(decl.DeferredReflection, fixtures.ComparableEntity, + Base): + __tablename__ = 'addresses' + + decl.DeferredReflection.prepare(testing.db) + self._roundtrip() + + def test_abstract_base(self): + class DefBase(decl.DeferredReflection, Base): + __abstract__ = True + + class OtherDefBase(decl.DeferredReflection, Base): + __abstract__ = True + + class User(fixtures.ComparableEntity, DefBase): + __tablename__ = 'users' + addresses = relationship("Address", backref="user") + + class Address(fixtures.ComparableEntity, DefBase): + __tablename__ = 'addresses' + + class Fake(OtherDefBase): + __tablename__ = 'nonexistent' + + DefBase.prepare(testing.db) + self._roundtrip()